1use crate::authentication::authenticate;
2use crate::debug::chain_config::ChainConfigRequest;
3use crate::debug::execution_witness::ExecutionWitnessRequest;
4use crate::debug::execution_witness_by_hash::ExecutionWitnessByBlockHashRequest;
5use crate::engine::blobs::{BlobsV2Request, BlobsV3Request};
6use crate::engine::client_version::GetClientVersionV1Request;
7use crate::engine::payload::{
8 GetPayloadV5Request, GetPayloadV6Request, NewPayloadV5Request, NewPayloadWithWitnessV5Request,
9};
10use crate::engine::{
11 ExchangeCapabilitiesRequest,
12 blobs::BlobsV1Request,
13 exchange_transition_config::ExchangeTransitionConfigV1Req,
14 fork_choice::{
15 ForkChoiceUpdatedV1, ForkChoiceUpdatedV2, ForkChoiceUpdatedV3, ForkChoiceUpdatedV4,
16 },
17 payload::{
18 GetPayloadBodiesByHashV1Request, GetPayloadBodiesByHashV2Request,
19 GetPayloadBodiesByRangeV1Request, GetPayloadBodiesByRangeV2Request, GetPayloadV1Request,
20 GetPayloadV2Request, GetPayloadV3Request, GetPayloadV4Request, NewPayloadV1Request,
21 NewPayloadV2Request, NewPayloadV3Request, NewPayloadV4Request,
22 },
23};
24use crate::eth::client::Config;
25use crate::eth::{
26 account::{
27 GetBalanceRequest, GetCodeRequest, GetProofRequest, GetStorageAtRequest,
28 GetTransactionCountRequest,
29 },
30 block::{
31 BlockNumberRequest, GetBlobBaseFee, GetBlockByHashRequest, GetBlockByNumberRequest,
32 GetBlockReceiptsRequest, GetBlockTransactionCountRequest, GetRawBlockRequest,
33 GetRawHeaderRequest, GetRawReceipts,
34 },
35 block_access_list::BlockAccessListRequest,
36 client::{ChainId, Syncing},
37 fee_market::FeeHistoryRequest,
38 filter::{self, ActiveFilters, DeleteFilterRequest, FilterChangesRequest, NewFilterRequest},
39 gas_price::GasPrice,
40 gas_tip_estimator::GasTipEstimator,
41 logs::LogsFilter,
42 transaction::{
43 CallRequest, CreateAccessListRequest, EstimateGasRequest, GetRawTransaction,
44 GetTransactionByBlockHashAndIndexRequest, GetTransactionByBlockNumberAndIndexRequest,
45 GetTransactionByHashRequest, GetTransactionReceiptRequest,
46 },
47};
48use crate::subscription_manager::{SubscriptionManager, SubscriptionManagerProtocol};
49use crate::tracing::{TraceBlockByNumberRequest, TraceTransactionRequest};
50use crate::types::transaction::SendRawTransactionRequest;
51use crate::utils::{
52 RpcErr, RpcErrorMetadata, RpcErrorResponse, RpcNamespace, RpcRequest, RpcRequestId,
53 RpcSuccessResponse,
54};
55use crate::{admin, net};
56use crate::{eth, mempool};
57use axum::extract::ws::{Message, WebSocket};
58use axum::extract::{DefaultBodyLimit, State, WebSocketUpgrade};
59use axum::{Json, Router, http::StatusCode, routing::post};
60use axum_extra::{
61 TypedHeader,
62 headers::{Authorization, authorization::Bearer},
63};
64use bytes::Bytes;
65use ethrex_blockchain::Blockchain;
66use ethrex_blockchain::error::ChainError;
67use ethrex_common::types::Block;
68use ethrex_common::types::block_access_list::BlockAccessList;
69use ethrex_common::types::block_execution_witness::ExecutionWitness;
70use ethrex_metrics::rpc::{RpcOutcome, record_async_duration, record_rpc_outcome};
71use ethrex_p2p::peer_handler::PeerHandler;
72use ethrex_p2p::sync_manager::SyncManager;
73use ethrex_p2p::types::Node;
74use ethrex_p2p::types::NodeRecord;
75use ethrex_storage::Store;
76use serde::Deserialize;
77use serde_json::Value;
78use spawned_concurrency::tasks::ActorRef;
79use std::{
80 collections::{HashMap, HashSet},
81 future::IntoFuture,
82 net::SocketAddr,
83 sync::{Arc, Mutex},
84 time::Duration,
85};
86use tokio::net::TcpListener;
87use tokio::sync::{
88 Mutex as TokioMutex,
89 mpsc::{UnboundedSender, unbounded_channel},
90 oneshot,
91};
92use tokio::time::timeout;
93use tower_http::cors::CorsLayer;
94use tracing::{error, info, warn};
95use tracing_subscriber::{EnvFilter, Registry, reload};
96
97#[cfg(all(feature = "jemalloc_profiling", target_os = "linux"))]
98use axum::response::IntoResponse;
99#[cfg(all(feature = "jemalloc_profiling", target_os = "linux"))]
101pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)> {
102 let Some(mutex) = jemalloc_pprof::PROF_CTL.as_ref() else {
103 return Err((
104 StatusCode::NOT_IMPLEMENTED,
105 "jemalloc profiling is not available".into(),
106 ));
107 };
108 let mut prof_ctl = mutex.lock().await;
109 require_profiling_activated(&prof_ctl)?;
110 let pprof = prof_ctl
111 .dump_pprof()
112 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
113 Ok(pprof)
114}
115
116#[cfg(all(feature = "jemalloc_profiling", target_os = "linux"))]
118fn require_profiling_activated(
119 prof_ctl: &jemalloc_pprof::JemallocProfCtl,
120) -> Result<(), (StatusCode, String)> {
121 if prof_ctl.activated() {
122 Ok(())
123 } else {
124 Err((
125 axum::http::StatusCode::FORBIDDEN,
126 "heap profiling not activated".into(),
127 ))
128 }
129}
130
131#[cfg(all(feature = "jemalloc_profiling", target_os = "linux"))]
132pub async fn handle_get_heap_flamegraph() -> Result<impl IntoResponse, (StatusCode, String)> {
133 use axum::body::Body;
134 use axum::http::header::CONTENT_TYPE;
135 use axum::response::Response;
136
137 let Some(mutex) = jemalloc_pprof::PROF_CTL.as_ref() else {
138 return Err((
139 StatusCode::NOT_IMPLEMENTED,
140 "jemalloc profiling is not available".into(),
141 ));
142 };
143 let mut prof_ctl = mutex.lock().await;
144 require_profiling_activated(&prof_ctl)?;
145 let svg = prof_ctl
146 .dump_flamegraph()
147 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
148 Response::builder()
149 .header(CONTENT_TYPE, "image/svg+xml")
150 .body(Body::from(svg))
151 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
152}
153
154#[cfg(not(all(feature = "jemalloc_profiling", target_os = "linux")))]
156pub async fn handle_get_heap() -> Result<(), (StatusCode, String)> {
157 Err((
158 StatusCode::NOT_IMPLEMENTED,
159 "jemalloc profiling is not available (build with `ethrex-rpc/jemalloc_profiling`, it only works on linux)".into(),
160 ))
161}
162
163#[cfg(not(all(feature = "jemalloc_profiling", target_os = "linux")))]
164pub async fn handle_get_heap_flamegraph() -> Result<(), (StatusCode, String)> {
165 Err((
166 StatusCode::NOT_IMPLEMENTED,
167 "jemalloc profiling is not available (build with `ethrex-rpc/jemalloc_profiling`, it only works on linux)".into(),
168 ))
169}
170
171#[derive(Deserialize)]
176#[serde(untagged)]
177pub enum RpcRequestWrapper {
178 Single(RpcRequest),
180 Multiple(Vec<RpcRequest>),
182}
183
184type BlockWorkerMessage = (
186 oneshot::Sender<Result<Option<ExecutionWitness>, ChainError>>,
187 Block,
188 Option<BlockAccessList>,
189 bool,
190);
191
192#[derive(Clone)]
197pub struct RpcApiContext {
198 pub storage: Store,
200 pub blockchain: Arc<Blockchain>,
202 pub active_filters: ActiveFilters,
204 pub syncer: Option<Arc<SyncManager>>,
206 pub peer_handler: Option<PeerHandler>,
208 pub node_data: NodeData,
210 pub gas_tip_estimator: Arc<TokioMutex<GasTipEstimator>>,
212 pub log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
214 pub gas_ceil: u64,
216 pub block_worker_channel: UnboundedSender<BlockWorkerMessage>,
218 pub ws: Option<WebSocketConfig>,
220 pub allowed_namespaces: Arc<HashSet<RpcNamespace>>,
226}
227
228#[derive(Clone)]
230pub struct WebSocketConfig {
231 pub addr: SocketAddr,
233 pub subscription_manager: ActorRef<SubscriptionManager>,
235}
236
237impl std::fmt::Debug for RpcApiContext {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 let mut s = f.debug_struct("RpcApiContext");
240 s.field("storage", &self.storage)
241 .field("blockchain", &self.blockchain)
242 .field("syncer", &self.syncer.as_ref().map(|_| ".."))
243 .field("peer_handler", &self.peer_handler.as_ref().map(|_| ".."))
244 .field("gas_ceil", &self.gas_ceil);
245 s.finish()
246 }
247}
248
249#[derive(Debug, Clone)]
256pub struct ClientVersion {
257 pub name: String,
259 pub version: String,
261 pub branch: String,
263 pub commit: String,
265 pub os_arch: String,
267 pub rustc_version: String,
269 formatted: String,
271}
272
273impl ClientVersion {
274 pub fn new(
276 name: String,
277 version: String,
278 branch: String,
279 commit: String,
280 os_arch: String,
281 rustc_version: String,
282 ) -> Self {
283 let formatted = format!(
284 "{}/v{}-{}-{}/{}/rustc-v{}",
285 name, version, branch, commit, os_arch, rustc_version
286 );
287 Self {
288 name,
289 version,
290 branch,
291 commit,
292 os_arch,
293 rustc_version,
294 formatted,
295 }
296 }
297}
298
299impl std::fmt::Display for ClientVersion {
300 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301 f.write_str(&self.formatted)
302 }
303}
304
305#[derive(Debug, Clone)]
310pub struct NodeData {
311 pub jwt_secret: Bytes,
313 pub local_p2p_node: Node,
315 pub local_node_record: NodeRecord,
317 pub client_version: ClientVersion,
319 pub extra_data: Bytes,
321}
322
323#[allow(async_fn_in_trait)]
353pub trait RpcHandler: Sized {
354 fn parse(params: &Option<Vec<Value>>) -> Result<Self, RpcErr>;
358
359 async fn call(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
364 let request = Self::parse(&req.params)?;
365 let namespace = match req.namespace() {
366 Ok(RpcNamespace::Engine) => "engine",
367 _ => "rpc",
368 };
369 let method = req.method.as_str();
370
371 let result =
372 record_async_duration(
373 namespace,
374 method,
375 async move { request.handle(context).await },
376 )
377 .await;
378
379 let outcome = match &result {
380 Ok(_) => RpcOutcome::Success,
381 Err(err) => RpcOutcome::Error(get_error_kind(err)),
382 };
383 record_rpc_outcome(namespace, method, outcome);
384
385 result
386 }
387
388 async fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr>;
392}
393
394fn get_error_kind(err: &RpcErr) -> &'static str {
395 match err {
396 RpcErr::MethodNotFound(_) => "MethodNotFound",
397 RpcErr::WrongParam(_) => "WrongParam",
398 RpcErr::BadParams(_) => "BadParams",
399 RpcErr::InvalidRequest(_) => "InvalidRequest",
400 RpcErr::MissingParam(_) => "MissingParam",
401 RpcErr::TooLargeRequest => "TooLargeRequest",
402 RpcErr::BadHexFormat(_) => "BadHexFormat",
403 RpcErr::UnsupportedFork(_) => "UnsupportedFork",
404 RpcErr::Internal(_) => "Internal",
405 RpcErr::Vm(_) => "Vm",
406 RpcErr::Revert { .. } => "Revert",
407 RpcErr::Halt { .. } => "Halt",
408 RpcErr::AuthenticationError(_) => "AuthenticationError",
409 RpcErr::InvalidForkChoiceState(_) => "InvalidForkChoiceState",
410 RpcErr::InvalidPayloadAttributes(_) => "InvalidPayloadAttributes",
411 RpcErr::TooDeepReorg(_) => "TooDeepReorg",
412 RpcErr::UnknownPayload(_) => "UnknownPayload",
413 RpcErr::InvalidProofFormat(_) => "InvalidProofFormat",
414 RpcErr::InvalidHeaderFormat(_) => "InvalidHeaderFormat",
415 RpcErr::InvalidPayload(_) => "InvalidPayload",
416 RpcErr::ProofGenerationUnavailable(_) => "ProofGenerationUnavailable",
417 }
418}
419
420pub const FILTER_DURATION: Duration = {
426 if cfg!(test) {
427 Duration::from_secs(1)
428 } else {
429 Duration::from_secs(5 * 60)
430 }
431};
432
433pub fn start_block_executor(blockchain: Arc<Blockchain>) -> UnboundedSender<BlockWorkerMessage> {
449 let (block_worker_channel, mut block_receiver) = unbounded_channel::<BlockWorkerMessage>();
450 std::thread::Builder::new()
451 .name("block_executor".to_string())
452 .spawn(move || {
453 while let Some((notify, block, bal, make_witness)) = block_receiver.blocking_recv() {
454 let result = (|| {
455 if make_witness {
456 let witness =
457 blockchain.add_block_pipeline_with_witness(block, bal.as_ref())?;
458 Ok(Some(witness))
459 } else {
460 blockchain.add_block_pipeline(block, bal.as_ref())?;
461 Ok(None)
462 }
463 })();
464 let _ = notify
465 .send(result)
466 .inspect_err(|_| tracing::error!("failed to notify caller"));
467 }
468 })
469 .expect("Falied to spawn block_executor thread");
470 block_worker_channel
471}
472
473#[allow(clippy::too_many_arguments)]
514pub async fn start_api(
515 http_addr: SocketAddr,
516 ws: Option<WebSocketConfig>,
517 authrpc_addr: SocketAddr,
518 storage: Store,
519 blockchain: Arc<Blockchain>,
520 jwt_secret: Bytes,
521 local_p2p_node: Node,
522 local_node_record: NodeRecord,
523 syncer: SyncManager,
524 peer_handler: PeerHandler,
525 client_version: ClientVersion,
526 log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
527 gas_ceil: u64,
528 extra_data: String,
529 allowed_namespaces: HashSet<RpcNamespace>,
530) -> Result<(), RpcErr> {
531 let active_filters = Arc::new(Mutex::new(HashMap::new()));
534 let block_worker_channel = start_block_executor(blockchain.clone());
535 let service_context = RpcApiContext {
536 storage,
537 blockchain,
538 active_filters: active_filters.clone(),
539 syncer: Some(Arc::new(syncer)),
540 peer_handler: Some(peer_handler),
541 node_data: NodeData {
542 jwt_secret,
543 local_p2p_node,
544 local_node_record,
545 client_version,
546 extra_data: extra_data.into(),
547 },
548 gas_tip_estimator: Arc::new(TokioMutex::new(GasTipEstimator::new())),
549 log_filter_handler,
550 gas_ceil,
551 block_worker_channel,
552 ws: ws.clone(),
553 allowed_namespaces: Arc::new(allowed_namespaces),
554 };
555
556 tokio::task::spawn(async move {
558 let mut interval = tokio::time::interval(FILTER_DURATION);
559 let filters = active_filters.clone();
560 loop {
561 interval.tick().await;
562 tracing::debug!("Running filter clean task");
563 filter::clean_outdated_filters(filters.clone(), FILTER_DURATION);
564 tracing::debug!("Filter clean task complete");
565 }
566 });
567
568 let cors = CorsLayer::permissive();
573
574 let http_router = Router::new()
575 .route("/debug/pprof/allocs", axum::routing::get(handle_get_heap))
576 .route(
577 "/debug/pprof/allocs/flamegraph",
578 axum::routing::get(handle_get_heap_flamegraph),
579 )
580 .route("/", post(handle_http_request))
581 .layer(cors.clone())
582 .with_state(service_context.clone());
583 let http_listener = TcpListener::bind(http_addr)
584 .await
585 .map_err(|error| RpcErr::Internal(error.to_string()))?;
586 let http_server = axum::serve(http_listener, http_router)
587 .with_graceful_shutdown(shutdown_signal())
588 .into_future();
589 info!("Starting HTTP server at {http_addr}");
590
591 let (timer_sender, mut timer_receiver) = tokio::sync::watch::channel(());
592
593 tokio::spawn(async move {
594 loop {
595 let result = timeout(Duration::from_secs(30), timer_receiver.changed()).await;
596 if result.is_err() {
597 warn!("No messages from the consensus layer. Is the consensus client running?");
598 }
599 }
600 });
601
602 let authrpc_handler = move |ctx, auth, body| async move {
603 let _ = timer_sender.send(());
604 handle_authrpc_request(ctx, auth, body).await
605 };
606
607 let authrpc_router = Router::new()
608 .route("/", post(authrpc_handler))
609 .with_state(service_context.clone())
610 .layer(DefaultBodyLimit::max(256 * 1024 * 1024));
613
614 let authrpc_listener = TcpListener::bind(authrpc_addr)
615 .await
616 .map_err(|error| RpcErr::Internal(error.to_string()))?;
617 let authrpc_server = axum::serve(authrpc_listener, authrpc_router)
618 .with_graceful_shutdown(shutdown_signal())
619 .into_future();
620 info!("Starting Auth-RPC server at {authrpc_addr}");
621
622 if let Some(ref ws_config) = ws {
623 let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async move {
624 ws.on_upgrade(|mut socket| async move {
625 handle_websocket(&mut socket, &ctx, |req| {
626 let c = ctx.clone();
627 async move { map_http_requests(&req, c).await }
628 })
629 .await;
630 })
631 };
632 let ws_router = Router::new()
633 .route("/", axum::routing::any(ws_handler))
634 .layer(cors)
635 .with_state(service_context);
636 let ws_listener = TcpListener::bind(ws_config.addr)
637 .await
638 .map_err(|error| RpcErr::Internal(error.to_string()))?;
639 let ws_server = axum::serve(ws_listener, ws_router)
640 .with_graceful_shutdown(shutdown_signal())
641 .into_future();
642 info!("Starting WS server at {}", ws_config.addr);
643
644 let _ = tokio::try_join!(authrpc_server, http_server, ws_server)
645 .inspect_err(|e| error!("Error shutting down servers: {e:?}"));
646 } else {
647 let _ = tokio::try_join!(authrpc_server, http_server)
648 .inspect_err(|e| error!("Error shutting down servers: {e:?}"));
649 }
650
651 Ok(())
652}
653
654pub async fn shutdown_signal() {
658 tokio::signal::ctrl_c()
659 .await
660 .expect("failed to install Ctrl+C handler");
661}
662
663const MAX_BATCH_SIZE: usize = 1000;
668
669fn null_id_error(err: RpcErr) -> Value {
673 let meta: RpcErrorMetadata = err.into();
674 serde_json::json!({
675 "jsonrpc": "2.0",
676 "id": Value::Null,
677 "error": meta,
678 })
679}
680
681fn validate_batch(wrapper: &RpcRequestWrapper) -> Option<Value> {
685 let RpcRequestWrapper::Multiple(requests) = wrapper else {
686 return None;
687 };
688 if requests.is_empty() {
689 return Some(null_id_error(RpcErr::InvalidRequest(
690 "empty batch is not a valid Request".to_string(),
691 )));
692 }
693 if requests.len() > MAX_BATCH_SIZE {
694 return Some(null_id_error(RpcErr::InvalidRequest(format!(
695 "batch too large: {} > {MAX_BATCH_SIZE}",
696 requests.len()
697 ))));
698 }
699 None
700}
701
702pub(crate) async fn handle_http_request(
703 State(service_context): State<RpcApiContext>,
704 body: String,
705) -> Result<Json<Value>, StatusCode> {
706 let wrapper: RpcRequestWrapper = match serde_json::from_str(&body) {
707 Ok(w) => w,
708 Err(_) => {
709 return Ok(Json(
710 rpc_response(
711 RpcRequestId::String("".to_string()),
712 Err(RpcErr::BadParams("Invalid request body".to_string())),
713 )
714 .map_err(|_| StatusCode::BAD_REQUEST)?,
715 ));
716 }
717 };
718
719 if let Some(err) = validate_batch(&wrapper) {
720 return Ok(Json(err));
721 }
722
723 let res = match wrapper {
724 RpcRequestWrapper::Single(request) => {
725 let res = map_http_requests(&request, service_context).await;
726 rpc_response(request.id, res).map_err(|_| StatusCode::BAD_REQUEST)?
727 }
728 RpcRequestWrapper::Multiple(requests) => {
729 let mut responses = Vec::with_capacity(requests.len());
730 for req in requests {
731 let res = map_http_requests(&req, service_context.clone()).await;
732 responses.push(rpc_response(req.id, res).map_err(|_| StatusCode::BAD_REQUEST)?);
733 }
734 serde_json::to_value(responses).map_err(|_| StatusCode::BAD_REQUEST)?
735 }
736 };
737 Ok(Json(res))
738}
739
740pub async fn handle_authrpc_request(
741 State(service_context): State<RpcApiContext>,
742 auth_header: Option<TypedHeader<Authorization<Bearer>>>,
743 body: String,
744) -> Result<Json<Value>, StatusCode> {
745 let wrapper: RpcRequestWrapper = match serde_json::from_str(&body) {
746 Ok(w) => w,
747 Err(_) => {
748 return Ok(Json(null_id_error(RpcErr::InvalidRequest(
749 "could not parse JSON-RPC request body".to_string(),
750 ))));
751 }
752 };
753
754 if let Some(err) = validate_batch(&wrapper) {
757 return Ok(Json(err));
758 }
759
760 if let Err(error) = authenticate(&service_context.node_data.jwt_secret, auth_header) {
761 let error_meta: RpcErrorMetadata = error.into();
765 let res = match wrapper {
766 RpcRequestWrapper::Single(req) => serde_json::json!({
767 "jsonrpc": "2.0",
768 "id": req.id,
769 "error": error_meta,
770 }),
771 RpcRequestWrapper::Multiple(requests) => {
772 let mut responses = Vec::with_capacity(requests.len());
773 for req in requests {
774 responses.push(serde_json::json!({
775 "jsonrpc": "2.0",
776 "id": req.id,
777 "error": error_meta.clone(),
778 }));
779 }
780 serde_json::to_value(responses).map_err(|_| StatusCode::BAD_REQUEST)?
781 }
782 };
783 return Ok(Json(res));
784 }
785
786 let res = match wrapper {
787 RpcRequestWrapper::Single(req) => {
788 let res = map_authrpc_requests(&req, service_context).await;
789 rpc_response(req.id, res).map_err(|_| StatusCode::BAD_REQUEST)?
790 }
791 RpcRequestWrapper::Multiple(requests) => {
792 let mut responses = Vec::with_capacity(requests.len());
793 for req in requests {
794 let res = map_authrpc_requests(&req, service_context.clone()).await;
795 responses.push(rpc_response(req.id, res).map_err(|_| StatusCode::BAD_REQUEST)?);
796 }
797 serde_json::to_value(responses).map_err(|_| StatusCode::BAD_REQUEST)?
798 }
799 };
800 Ok(Json(res))
801}
802
803pub async fn handle_websocket<F, Fut, E>(
812 socket: &mut WebSocket,
813 context: &RpcApiContext,
814 route_request: F,
815) where
816 F: Fn(RpcRequest) -> Fut,
817 Fut: std::future::Future<Output = Result<Value, E>>,
818 E: Into<RpcErrorMetadata>,
819{
820 let (out_tx, mut out_rx) = tokio::sync::mpsc::channel::<String>(
821 crate::subscription_manager::SUBSCRIBER_CHANNEL_CAPACITY,
822 );
823 let mut subscription_ids: Vec<String> = Vec::new();
827
828 loop {
829 tokio::select! {
830 msg = socket.recv() => {
831 let Some(msg) = msg else { break };
832 let body = match msg {
833 Ok(Message::Text(text)) => text.to_string(),
834 Ok(Message::Close(_)) => break,
835 Ok(_) => continue,
836 Err(_) => break,
837 };
838
839 let response = handle_ws_request(
840 &body, context, &out_tx, &mut subscription_ids, &route_request,
841 ).await;
842 if let Some(resp) = response
843 && socket.send(Message::Text(resp.into())).await.is_err()
844 {
845 break;
846 }
847 }
848
849 Some(msg) = out_rx.recv() => {
850 if socket.send(Message::Text(msg.into())).await.is_err() {
851 break;
852 }
853 }
854 }
855 }
856
857 if let Some(ws) = &context.ws {
858 for id in subscription_ids {
859 let _ = ws.subscription_manager.unsubscribe(id).await;
860 }
861 }
862}
863
864async fn handle_ws_request<F, Fut, E>(
865 body: &str,
866 context: &RpcApiContext,
867 out_tx: &tokio::sync::mpsc::Sender<String>,
868 subscription_ids: &mut Vec<String>,
869 route_request: &F,
870) -> Option<String>
871where
872 F: Fn(RpcRequest) -> Fut,
873 Fut: std::future::Future<Output = Result<Value, E>>,
874 E: Into<RpcErrorMetadata>,
875{
876 let parsed: Value = match serde_json::from_str(body) {
880 Ok(v) => v,
881 Err(_) => return Some(ws_error_response(None, -32700, "Parse error")),
882 };
883
884 let wrapper: RpcRequestWrapper = match serde_json::from_value(parsed) {
886 Ok(w) => w,
887 Err(_) => return Some(ws_error_response(None, -32600, "Invalid Request")),
888 };
889
890 match wrapper {
891 RpcRequestWrapper::Single(req) => {
892 let resp =
893 process_ws_request(req, context, out_tx, subscription_ids, route_request).await?;
894 Some(resp.to_string())
895 }
896 RpcRequestWrapper::Multiple(reqs) => {
897 if reqs.is_empty() {
899 return Some(ws_error_response(None, -32600, "Invalid Request"));
900 }
901 let mut responses = Vec::with_capacity(reqs.len());
902 for req in reqs {
903 if let Some(resp) =
904 process_ws_request(req, context, out_tx, subscription_ids, route_request).await
905 {
906 responses.push(resp);
907 }
908 }
909 if responses.is_empty() {
910 None
911 } else {
912 serde_json::to_string(&responses).ok()
913 }
914 }
915 }
916}
917
918async fn process_ws_request<F, Fut, E>(
919 req: RpcRequest,
920 context: &RpcApiContext,
921 out_tx: &tokio::sync::mpsc::Sender<String>,
922 subscription_ids: &mut Vec<String>,
923 route_request: &F,
924) -> Option<Value>
925where
926 F: Fn(RpcRequest) -> Fut,
927 Fut: std::future::Future<Output = Result<Value, E>>,
928 E: Into<RpcErrorMetadata>,
929{
930 match req.method.as_str() {
931 "eth_subscribe" | "eth_unsubscribe" => {
932 if !context.allowed_namespaces.contains(&RpcNamespace::Eth) {
937 let err: Result<Value, RpcErr> = Err(RpcErr::MethodNotFound(req.method.clone()));
938 return rpc_response(req.id, err).ok();
939 }
940 let result = if req.method == "eth_subscribe" {
941 handle_eth_subscribe(&req, context, out_tx, subscription_ids).await
942 } else {
943 handle_eth_unsubscribe(&req, context, subscription_ids).await
944 };
945 rpc_response(req.id, result).ok()
946 }
947 _ => {
948 let id = req.id.clone();
949 let res = route_request(req).await;
950 rpc_response(id, res).ok()
951 }
952 }
953}
954
955fn ws_error_response(id: Option<RpcRequestId>, code: i32, message: &str) -> String {
958 let id = match id {
959 Some(id) => serde_json::to_value(id).unwrap_or(Value::Null),
960 None => Value::Null,
961 };
962 serde_json::json!({
963 "jsonrpc": "2.0",
964 "id": id,
965 "error": { "code": code, "message": message },
966 })
967 .to_string()
968}
969
970pub async fn handle_eth_subscribe(
975 req: &crate::utils::RpcRequest,
976 context: &RpcApiContext,
977 out_tx: &tokio::sync::mpsc::Sender<String>,
978 subscription_ids: &mut Vec<String>,
979) -> Result<Value, RpcErr> {
980 use crate::subscription_manager::MAX_SUBSCRIPTIONS_PER_CONNECTION;
981
982 let params = req.params.as_deref().unwrap_or(&[]);
983 let sub_type = params.first().and_then(|v| v.as_str()).ok_or_else(|| {
984 RpcErr::BadParams("eth_subscribe requires a subscription type parameter".to_string())
985 })?;
986
987 if subscription_ids.len() >= MAX_SUBSCRIPTIONS_PER_CONNECTION {
988 return Err(RpcErr::BadParams(format!(
989 "Too many subscriptions (max {MAX_SUBSCRIPTIONS_PER_CONNECTION})"
990 )));
991 }
992
993 match sub_type {
994 "newHeads" => {
995 let ws = context
996 .ws
997 .as_ref()
998 .ok_or_else(|| RpcErr::Internal("WebSocket server not enabled".to_string()))?;
999
1000 let id = ws
1001 .subscription_manager
1002 .subscribe(out_tx.clone())
1003 .await
1004 .map_err(|e| RpcErr::Internal(format!("Subscription failed: {e}")))?
1005 .ok_or_else(|| RpcErr::Internal("Global subscription cap reached".to_string()))?;
1006
1007 subscription_ids.push(id.clone());
1008 Ok(Value::String(id))
1009 }
1010 other => Err(RpcErr::BadParams(format!(
1011 "Unsupported subscription type: {other}"
1012 ))),
1013 }
1014}
1015
1016pub async fn handle_eth_unsubscribe(
1021 req: &crate::utils::RpcRequest,
1022 context: &RpcApiContext,
1023 subscription_ids: &mut Vec<String>,
1024) -> Result<Value, RpcErr> {
1025 let params = req.params.as_deref().unwrap_or(&[]);
1026 let sub_id = params
1027 .first()
1028 .and_then(|v| v.as_str())
1029 .ok_or_else(|| {
1030 RpcErr::BadParams("eth_unsubscribe requires a subscription ID parameter".to_string())
1031 })?
1032 .to_string();
1033
1034 let Some(pos) = subscription_ids.iter().position(|id| id == &sub_id) else {
1036 return Ok(Value::Bool(false));
1037 };
1038
1039 let removed = if let Some(ref ws) = context.ws {
1040 ws.subscription_manager
1041 .unsubscribe(sub_id)
1042 .await
1043 .unwrap_or(false)
1044 } else {
1045 false
1046 };
1047
1048 if removed {
1049 subscription_ids.swap_remove(pos);
1050 }
1051
1052 Ok(Value::Bool(removed))
1053}
1054
1055pub async fn map_http_requests(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
1057 let namespace = match req.namespace() {
1058 Ok(ns) => ns,
1059 Err(rpc_err) => return Err(rpc_err),
1060 };
1061 if !context.allowed_namespaces.contains(&namespace) {
1062 return Err(RpcErr::MethodNotFound(req.method.clone()));
1063 }
1064 match namespace {
1065 RpcNamespace::Eth => map_eth_requests(req, context).await,
1066 RpcNamespace::Admin => map_admin_requests(req, context).await,
1067 RpcNamespace::Debug => map_debug_requests(req, context).await,
1068 RpcNamespace::Web3 => map_web3_requests(req, context),
1069 RpcNamespace::Net => map_net_requests(req, context).await,
1070 RpcNamespace::Mempool => map_mempool_requests(req, context),
1071 RpcNamespace::Engine => Err(RpcErr::MethodNotFound(req.method.clone())),
1076 }
1077}
1078
1079pub async fn map_authrpc_requests(
1081 req: &RpcRequest,
1082 context: RpcApiContext,
1083) -> Result<Value, RpcErr> {
1084 match req.namespace() {
1085 Ok(RpcNamespace::Engine) => map_engine_requests(req, context).await,
1086 Ok(RpcNamespace::Eth) => map_eth_requests(req, context).await,
1087 _ => Err(RpcErr::MethodNotFound(req.method.clone())),
1088 }
1089}
1090
1091pub async fn map_eth_requests(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
1101 match req.method.as_str() {
1102 "eth_chainId" => ChainId::call(req, context).await,
1103 "eth_syncing" => Syncing::call(req, context).await,
1104 "eth_getBlockByNumber" => GetBlockByNumberRequest::call(req, context).await,
1105 "eth_getBlockByHash" => GetBlockByHashRequest::call(req, context).await,
1106 "eth_getBalance" => GetBalanceRequest::call(req, context).await,
1107 "eth_getCode" => GetCodeRequest::call(req, context).await,
1108 "eth_getStorageAt" => GetStorageAtRequest::call(req, context).await,
1109 "eth_getBlockTransactionCountByNumber" => {
1110 GetBlockTransactionCountRequest::call(req, context).await
1111 }
1112 "eth_getBlockTransactionCountByHash" => {
1113 GetBlockTransactionCountRequest::call(req, context).await
1114 }
1115 "eth_getTransactionByBlockNumberAndIndex" => {
1116 GetTransactionByBlockNumberAndIndexRequest::call(req, context).await
1117 }
1118 "eth_getTransactionByBlockHashAndIndex" => {
1119 GetTransactionByBlockHashAndIndexRequest::call(req, context).await
1120 }
1121 "eth_getBlockReceipts" => GetBlockReceiptsRequest::call(req, context).await,
1122 "eth_getBlockAccessList" => BlockAccessListRequest::call(req, context).await,
1123 "eth_getTransactionByHash" => GetTransactionByHashRequest::call(req, context).await,
1124 "eth_getTransactionReceipt" => GetTransactionReceiptRequest::call(req, context).await,
1125 "eth_createAccessList" => CreateAccessListRequest::call(req, context).await,
1126 "eth_blockNumber" => BlockNumberRequest::call(req, context).await,
1127 "eth_call" => CallRequest::call(req, context).await,
1128 "eth_blobBaseFee" => GetBlobBaseFee::call(req, context).await,
1129 "eth_getTransactionCount" => GetTransactionCountRequest::call(req, context).await,
1130 "eth_feeHistory" => FeeHistoryRequest::call(req, context).await,
1131 "eth_estimateGas" => EstimateGasRequest::call(req, context).await,
1132 "eth_getLogs" => LogsFilter::call(req, context).await,
1133 "eth_newFilter" => {
1134 NewFilterRequest::stateful_call(req, context.storage, context.active_filters).await
1135 }
1136 "eth_uninstallFilter" => {
1137 DeleteFilterRequest::stateful_call(req, context.storage, context.active_filters)
1138 }
1139 "eth_getFilterChanges" => {
1140 FilterChangesRequest::stateful_call(req, context.storage, context.active_filters).await
1141 }
1142 "eth_sendRawTransaction" => SendRawTransactionRequest::call(req, context).await,
1143 "eth_getProof" => GetProofRequest::call(req, context).await,
1144 "eth_gasPrice" => GasPrice::call(req, context).await,
1145 "eth_maxPriorityFeePerGas" => {
1146 eth::max_priority_fee::MaxPriorityFee::call(req, context).await
1147 }
1148 "eth_config" => Config::call(req, context).await,
1149 unknown_eth_method => Err(RpcErr::MethodNotFound(unknown_eth_method.to_owned())),
1150 }
1151}
1152
1153pub async fn map_debug_requests(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
1160 match req.method.as_str() {
1161 "debug_getRawHeader" => GetRawHeaderRequest::call(req, context).await,
1162 "debug_getRawBlock" => GetRawBlockRequest::call(req, context).await,
1163 "debug_getRawTransaction" => GetRawTransaction::call(req, context).await,
1164 "debug_getRawReceipts" => GetRawReceipts::call(req, context).await,
1165 "debug_executionWitness" => ExecutionWitnessRequest::call(req, context).await,
1166 "debug_executionWitnessByBlockHash" => {
1167 ExecutionWitnessByBlockHashRequest::call(req, context).await
1168 }
1169 "debug_chainConfig" => ChainConfigRequest::call(req, context).await,
1170 "debug_traceTransaction" => TraceTransactionRequest::call(req, context).await,
1171 "debug_traceBlockByNumber" => TraceBlockByNumberRequest::call(req, context).await,
1172 unknown_debug_method => Err(RpcErr::MethodNotFound(unknown_debug_method.to_owned())),
1173 }
1174}
1175
1176pub async fn map_engine_requests(
1189 req: &RpcRequest,
1190 context: RpcApiContext,
1191) -> Result<Value, RpcErr> {
1192 match req.method.as_str() {
1193 "engine_exchangeCapabilities" => ExchangeCapabilitiesRequest::call(req, context).await,
1194 "engine_forkchoiceUpdatedV1" => ForkChoiceUpdatedV1::call(req, context).await,
1195 "engine_forkchoiceUpdatedV2" => ForkChoiceUpdatedV2::call(req, context).await,
1196 "engine_forkchoiceUpdatedV3" => ForkChoiceUpdatedV3::call(req, context).await,
1197 "engine_forkchoiceUpdatedV4" => ForkChoiceUpdatedV4::call(req, context).await,
1198 "engine_newPayloadWithWitnessV5" => {
1206 Box::pin(NewPayloadWithWitnessV5Request::call(req, context)).await
1207 }
1208 "engine_newPayloadV5" => Box::pin(NewPayloadV5Request::call(req, context)).await,
1209 "engine_newPayloadV4" => Box::pin(NewPayloadV4Request::call(req, context)).await,
1210 "engine_newPayloadV3" => Box::pin(NewPayloadV3Request::call(req, context)).await,
1211 "engine_newPayloadV2" => NewPayloadV2Request::call(req, context).await,
1212 "engine_newPayloadV1" => NewPayloadV1Request::call(req, context).await,
1213 "engine_exchangeTransitionConfigurationV1" => {
1214 ExchangeTransitionConfigV1Req::call(req, context).await
1215 }
1216 "engine_getPayloadV6" => GetPayloadV6Request::call(req, context).await,
1217 "engine_getPayloadV5" => GetPayloadV5Request::call(req, context).await,
1218 "engine_getPayloadV4" => GetPayloadV4Request::call(req, context).await,
1219 "engine_getPayloadV3" => GetPayloadV3Request::call(req, context).await,
1220 "engine_getPayloadV2" => GetPayloadV2Request::call(req, context).await,
1221 "engine_getPayloadV1" => GetPayloadV1Request::call(req, context).await,
1222 "engine_getPayloadBodiesByHashV1" => {
1223 GetPayloadBodiesByHashV1Request::call(req, context).await
1224 }
1225 "engine_getPayloadBodiesByRangeV1" => {
1226 GetPayloadBodiesByRangeV1Request::call(req, context).await
1227 }
1228 "engine_getPayloadBodiesByHashV2" => {
1229 GetPayloadBodiesByHashV2Request::call(req, context).await
1230 }
1231 "engine_getPayloadBodiesByRangeV2" => {
1232 GetPayloadBodiesByRangeV2Request::call(req, context).await
1233 }
1234 "engine_getBlobsV1" => BlobsV1Request::call(req, context).await,
1235 "engine_getBlobsV2" => BlobsV2Request::call(req, context).await,
1236 "engine_getBlobsV3" => BlobsV3Request::call(req, context).await,
1237 "engine_getClientVersionV1" => GetClientVersionV1Request::call(req, context).await,
1238 unknown_engine_method => Err(RpcErr::MethodNotFound(unknown_engine_method.to_owned())),
1239 }
1240}
1241
1242pub async fn map_admin_requests(
1243 req: &RpcRequest,
1244 mut context: RpcApiContext,
1245) -> Result<Value, RpcErr> {
1246 match req.method.as_str() {
1247 "admin_nodeInfo" => admin::node_info(context.storage, &context.node_data).await,
1248 "admin_peers" => admin::peers(&mut context).await,
1249 "admin_peerScores" => admin::peer_scores(&mut context).await,
1250 "admin_syncStatus" => admin::sync_status(&mut context).await,
1251 "admin_setLogLevel" => admin::set_log_level(req, &context.log_filter_handler),
1252 "admin_addPeer" => admin::add_peer(&mut context, req).await,
1253 unknown_admin_method => Err(RpcErr::MethodNotFound(unknown_admin_method.to_owned())),
1254 }
1255}
1256
1257pub fn map_web3_requests(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
1258 match req.method.as_str() {
1259 "web3_clientVersion" => Ok(Value::String(context.node_data.client_version.to_string())),
1260 unknown_web3_method => Err(RpcErr::MethodNotFound(unknown_web3_method.to_owned())),
1261 }
1262}
1263
1264pub async fn map_net_requests(req: &RpcRequest, contex: RpcApiContext) -> Result<Value, RpcErr> {
1265 match req.method.as_str() {
1266 "net_version" => net::version(req, contex),
1267 "net_peerCount" => net::peer_count(req, contex).await,
1268 unknown_net_method => Err(RpcErr::MethodNotFound(unknown_net_method.to_owned())),
1269 }
1270}
1271
1272pub fn map_mempool_requests(req: &RpcRequest, contex: RpcApiContext) -> Result<Value, RpcErr> {
1273 match req.method.as_str() {
1274 "txpool_content" => mempool::content(contex),
1276 "txpool_contentFrom" => mempool::content_from(&req.params, contex),
1277 "txpool_status" => mempool::status(contex),
1278 "txpool_inspect" => mempool::inspect(contex),
1279 unknown_mempool_method => Err(RpcErr::MethodNotFound(unknown_mempool_method.to_owned())),
1280 }
1281}
1282
1283pub fn rpc_response<E>(id: RpcRequestId, res: Result<Value, E>) -> Result<Value, RpcErr>
1297where
1298 E: Into<RpcErrorMetadata>,
1299{
1300 Ok(match res {
1301 Ok(result) => serde_json::to_value(RpcSuccessResponse {
1302 id,
1303 jsonrpc: "2.0".to_string(),
1304 result,
1305 }),
1306 Err(error) => serde_json::to_value(RpcErrorResponse {
1307 id,
1308 jsonrpc: "2.0".to_string(),
1309 error: error.into(),
1310 }),
1311 }?)
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316 use super::*;
1317 use crate::test_utils::default_context_with_storage;
1318 use ethrex_common::{
1319 H160,
1320 types::{BlockHeader, ChainConfig, Genesis},
1321 };
1322 use ethrex_crypto::keccak::keccak_hash;
1323 use ethrex_storage::{EngineType, Store};
1324 use std::io::BufReader;
1325 use std::str::FromStr;
1326 use std::{fs::File, path::Path};
1327
1328 #[tokio::test]
1332 async fn http_api_allowlist_blocks_debug_namespace_by_default() {
1333 let body = r#"{"jsonrpc":"2.0","method":"debug_traceTransaction","params":["0x0"],"id":1}"#;
1334 let request: RpcRequest = serde_json::from_str(body).unwrap();
1335 let mut storage =
1336 Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
1337 storage
1338 .set_chain_config(&example_chain_config())
1339 .await
1340 .unwrap();
1341 let mut context = default_context_with_storage(storage).await;
1342 context.allowed_namespaces = Arc::new(crate::DEFAULT_HTTP_API.iter().copied().collect());
1343
1344 let result = map_http_requests(&request, context).await;
1345 match result {
1346 Err(RpcErr::MethodNotFound(method)) => {
1347 assert_eq!(method, "debug_traceTransaction");
1348 }
1349 other => panic!("expected MethodNotFound, got {other:?}"),
1350 }
1351 }
1352
1353 #[tokio::test]
1355 async fn http_api_allowlist_default_routes_standard_namespaces() {
1356 let mut storage =
1357 Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
1358 storage
1359 .set_chain_config(&example_chain_config())
1360 .await
1361 .unwrap();
1362 let mut context = default_context_with_storage(storage).await;
1363 context.allowed_namespaces = Arc::new(crate::DEFAULT_HTTP_API.iter().copied().collect());
1364
1365 for method in ["eth_chainId", "net_version", "web3_clientVersion"] {
1366 let body = format!(r#"{{"jsonrpc":"2.0","method":"{method}","params":[],"id":1}}"#);
1367 let request: RpcRequest = serde_json::from_str(&body).unwrap();
1368 let result = map_http_requests(&request, context.clone()).await;
1369 assert!(
1370 !matches!(result, Err(RpcErr::MethodNotFound(_))),
1371 "default allowlist should route {method}, got {result:?}"
1372 );
1373 }
1374 }
1375
1376 #[tokio::test]
1380 async fn ws_subscribe_blocked_when_eth_namespace_disabled() {
1381 let body = r#"{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}"#;
1382 let request: RpcRequest = serde_json::from_str(body).unwrap();
1383 let mut storage =
1384 Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
1385 storage
1386 .set_chain_config(&example_chain_config())
1387 .await
1388 .unwrap();
1389 let mut context = default_context_with_storage(storage).await;
1390 let mut without_eth: HashSet<RpcNamespace> = crate::test_utils::all_namespaces_for_tests();
1392 without_eth.remove(&RpcNamespace::Eth);
1393 context.allowed_namespaces = Arc::new(without_eth);
1394
1395 let (out_tx, _out_rx) = tokio::sync::mpsc::channel::<String>(1);
1396 let mut subscription_ids: Vec<String> = Vec::new();
1397 let route_request = |_req: RpcRequest| async move {
1398 panic!(
1399 "route_request must not be called for eth_subscribe when the namespace is disabled"
1400 );
1401 #[allow(unreachable_code)]
1402 Ok::<Value, RpcErr>(Value::Null)
1403 };
1404
1405 let response = process_ws_request(
1406 request,
1407 &context,
1408 &out_tx,
1409 &mut subscription_ids,
1410 &route_request,
1411 )
1412 .await
1413 .expect("process_ws_request should return an error response");
1414
1415 let err = response.get("error").expect("expected error field");
1416 assert_eq!(
1417 err.get("code").and_then(|v| v.as_i64()),
1418 Some(-32601),
1419 "expected MethodNotFound (-32601), got {response}"
1420 );
1421 assert!(
1422 subscription_ids.is_empty(),
1423 "no subscription should have been registered"
1424 );
1425 }
1426
1427 #[tokio::test]
1431 async fn engine_namespace_rejected_on_http() {
1432 let body = r#"{"jsonrpc":"2.0","method":"engine_forkchoiceUpdatedV3","params":[],"id":1}"#;
1433 let request: RpcRequest = serde_json::from_str(body).unwrap();
1434 let mut storage =
1435 Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
1436 storage
1437 .set_chain_config(&example_chain_config())
1438 .await
1439 .unwrap();
1440 let mut context = default_context_with_storage(storage).await;
1441 let mut all_with_engine: HashSet<RpcNamespace> =
1442 crate::test_utils::all_namespaces_for_tests();
1443 all_with_engine.insert(RpcNamespace::Engine);
1444 context.allowed_namespaces = Arc::new(all_with_engine);
1445
1446 let result = map_http_requests(&request, context).await;
1447 assert!(matches!(result, Err(RpcErr::MethodNotFound(_))));
1448 }
1449
1450 fn to_rpc_response_success_value(str: &str) -> serde_json::Value {
1453 serde_json::to_value(serde_json::from_str::<RpcSuccessResponse>(str).unwrap()).unwrap()
1454 }
1455
1456 #[tokio::test]
1457 async fn admin_nodeinfo_request() {
1458 let body = r#"{"jsonrpc":"2.0", "method":"admin_nodeInfo", "params":[], "id":1}"#;
1459 let request: RpcRequest = serde_json::from_str(body).unwrap();
1460 let mut storage =
1461 Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
1462 storage
1463 .set_chain_config(&example_chain_config())
1464 .await
1465 .unwrap();
1466 let context = default_context_with_storage(storage).await;
1467 let local_p2p_node = context.node_data.local_p2p_node.clone();
1468
1469 let enr_url = context.node_data.local_node_record.enr_url().unwrap();
1470 let result = map_http_requests(&request, context).await;
1471 let rpc_response = rpc_response(request.id, result).unwrap();
1472 let blob_schedule = serde_json::json!({
1473 "cancun": { "baseFeeUpdateFraction": 3338477, "max": 6, "target": 3, },
1474 "prague": { "baseFeeUpdateFraction": 5007716, "max": 9, "target": 6, },
1475 "osaka": { "baseFeeUpdateFraction": 5007716, "max": 9, "target": 6, },
1476 "bpo1": { "baseFeeUpdateFraction": 8346193, "max": 15, "target": 10, },
1477 "bpo2": { "baseFeeUpdateFraction": 11684671, "max": 21, "target": 14, },
1478 });
1479 let default_hash = BlockHeader::default().hash();
1481 let json = serde_json::json!({
1482 "jsonrpc": "2.0",
1483 "id": 1,
1484 "result": {
1485 "enode": "enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@127.0.0.1:30303",
1486 "enr": enr_url,
1487 "id": hex::encode(keccak_hash(local_p2p_node.public_key)),
1488 "ip": "127.0.0.1",
1489 "listenAddr": "127.0.0.1:30303",
1490 "name": "ethrex/v0.1.0-test-abcd1234/x86_64-unknown-linux/rustc-v1.70.0",
1491 "ports": {
1492 "discovery": 30303,
1493 "listener": 30303
1494 },
1495 "protocols": {
1496 "eth": {
1497 "network": 3151908,
1498 "genesis": default_hash,
1499 "config": {
1500 "chainId": 3151908,
1501 "homesteadBlock": 0,
1502 "daoForkBlock": null,
1503 "daoForkSupport": false,
1504 "eip150Block": 0,
1505 "eip155Block": 0,
1506 "eip158Block": 0,
1507 "byzantiumBlock": 0,
1508 "constantinopleBlock": 0,
1509 "petersburgBlock": 0,
1510 "istanbulBlock": 0,
1511 "muirGlacierBlock": null,
1512 "berlinBlock": 0,
1513 "londonBlock": 0,
1514 "arrowGlacierBlock": null,
1515 "grayGlacierBlock": null,
1516 "mergeNetsplitBlock": 0,
1517 "shanghaiTime": 0,
1518 "cancunTime": 0,
1519 "pragueTime": 1718232101,
1520 "verkleTime": null,
1521 "osakaTime": null,
1522 "bpo1Time": null,
1523 "bpo2Time": null,
1524 "bpo3Time": null,
1525 "bpo4Time": null,
1526 "bpo5Time": null,
1527 "amsterdamTime": null,
1528 "terminalTotalDifficulty": "0x0",
1529 "terminalTotalDifficultyPassed": true,
1530 "blobSchedule": blob_schedule,
1531 "depositContractAddress": H160::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap(),
1532 "enableVerkleAtGenesis": false,
1533 },
1534 "head": default_hash,
1535 }
1536 },
1537 }
1538 });
1539 let expected_response = to_rpc_response_success_value(&json.to_string());
1540 assert_eq!(rpc_response.to_string(), expected_response.to_string())
1541 }
1542
1543 fn read_execution_api_genesis_file() -> Genesis {
1545 let file = File::open("../../../fixtures/genesis/execution-api.json")
1546 .expect("Failed to open genesis file");
1547 let reader = BufReader::new(file);
1548 serde_json::from_reader(reader).expect("Failed to deserialize genesis file")
1549 }
1550
1551 #[tokio::test]
1552 async fn create_access_list_simple_transfer() {
1553 let body = r#"{"jsonrpc":"2.0","id":1,"method":"eth_createAccessList","params":[{"from":"0x0c2c51a0990aee1d73c1228de158688341557508","nonce":"0x0","to":"0x0100000000000000000000000000000000000000","value":"0xa"},"0x00"]}"#;
1556 let request: RpcRequest = serde_json::from_str(body).unwrap();
1557 let mut storage =
1559 Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
1560 let genesis = read_execution_api_genesis_file();
1561 storage
1562 .add_initial_state(genesis)
1563 .await
1564 .expect("Failed to add genesis block to DB");
1565 let context = default_context_with_storage(storage).await;
1567 let result = map_http_requests(&request, context).await;
1568 let response = rpc_response(request.id, result).unwrap();
1569 let expected_response = to_rpc_response_success_value(
1570 r#"{"jsonrpc":"2.0","id":1,"result":{"accessList":[],"gasUsed":"0x5208"}}"#,
1571 );
1572 assert_eq!(response.to_string(), expected_response.to_string());
1573 }
1574
1575 fn example_chain_config() -> ChainConfig {
1576 ChainConfig {
1577 chain_id: 3151908_u64,
1578 homestead_block: Some(0),
1579 eip150_block: Some(0),
1580 eip155_block: Some(0),
1581 eip158_block: Some(0),
1582 byzantium_block: Some(0),
1583 constantinople_block: Some(0),
1584 petersburg_block: Some(0),
1585 istanbul_block: Some(0),
1586 berlin_block: Some(0),
1587 london_block: Some(0),
1588 merge_netsplit_block: Some(0),
1589 shanghai_time: Some(0),
1590 cancun_time: Some(0),
1591 prague_time: Some(1718232101),
1592 terminal_total_difficulty: Some(0),
1593 terminal_total_difficulty_passed: true,
1594 deposit_contract_address: H160::from_str("0x00000000219ab540356cbb839cbe05303d7705fa")
1595 .unwrap(),
1596 ..Default::default()
1597 }
1598 }
1599
1600 #[tokio::test]
1604 async fn admin_nodeinfo_large_terminal_total_difficulty() {
1605 let mainnet_ttd: u128 = 58_750_000_000_000_000_000_000;
1608
1609 let body = r#"{"jsonrpc":"2.0", "method":"admin_nodeInfo", "params":[], "id":1}"#;
1610 let request: RpcRequest = serde_json::from_str(body).unwrap();
1611 let mut storage =
1612 Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
1613 let mut config = example_chain_config();
1614 config.terminal_total_difficulty = Some(mainnet_ttd);
1615 storage.set_chain_config(&config).await.unwrap();
1616 let context = default_context_with_storage(storage).await;
1617
1618 let result = map_http_requests(&request, context).await;
1619 assert!(
1620 result.is_ok(),
1621 "admin_nodeInfo should not fail with large terminal_total_difficulty"
1622 );
1623
1624 let value = result.unwrap();
1625 let ttd = value
1626 .pointer("/protocols/eth/config/terminalTotalDifficulty")
1627 .expect("terminalTotalDifficulty should be present in response");
1628 assert_eq!(ttd.as_str().unwrap(), "0xc70d808a128d7380000");
1630 }
1631
1632 #[tokio::test]
1633 async fn net_version_test() {
1634 let body = r#"{"jsonrpc":"2.0","method":"net_version","params":[],"id":67}"#;
1635 let request: RpcRequest = serde_json::from_str(body).expect("serde serialization failed");
1636 let mut storage =
1638 Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
1639 storage
1640 .set_chain_config(&example_chain_config())
1641 .await
1642 .unwrap();
1643 let chain_id = storage.get_chain_config().chain_id.to_string();
1644 let context = default_context_with_storage(storage).await;
1645 let result = map_http_requests(&request, context).await;
1647 let response = rpc_response(request.id, result).unwrap();
1648 let expected_response_string =
1649 format!(r#"{{"id":67,"jsonrpc": "2.0","result": "{chain_id}"}}"#);
1650 let expected_response = to_rpc_response_success_value(&expected_response_string);
1651 assert_eq!(response.to_string(), expected_response.to_string());
1652 }
1653
1654 #[tokio::test]
1655 async fn eth_config_request_cancun_with_prague_scheduled() {
1656 let body = r#"{"jsonrpc":"2.0", "method":"eth_config", "params":[], "id":1}"#;
1657 let request: RpcRequest = serde_json::from_str(body).unwrap();
1658 let storage = Store::new_from_genesis(
1659 Path::new("temp.db"),
1660 EngineType::InMemory,
1661 "../../../cmd/ethrex/networks/hoodi/genesis.json",
1662 )
1663 .await
1664 .expect("Failed to create test DB");
1665 let context = default_context_with_storage(storage).await;
1666 let result = map_http_requests(&request, context).await;
1667 let rpc_response = rpc_response(request.id, result).unwrap();
1668 let json = serde_json::json!({
1669 "id": 1,
1670 "jsonrpc": "2.0",
1671 "result": {
1672 "current": {
1673 "activationTime": 0,
1674 "blobSchedule": {
1675 "baseFeeUpdateFraction": 3338477,
1676 "max": 6,
1677 "target": 3
1678 },
1679 "chainId": "0x88bb0",
1680 "forkId": "0xbef71d30",
1681 "precompiles": {
1682 "BLAKE2F": "0x0000000000000000000000000000000000000009",
1683 "BN254_ADD": "0x0000000000000000000000000000000000000006",
1684 "BN254_MUL": "0x0000000000000000000000000000000000000007",
1685 "BN254_PAIRING": "0x0000000000000000000000000000000000000008",
1686 "ECREC": "0x0000000000000000000000000000000000000001",
1687 "ID": "0x0000000000000000000000000000000000000004",
1688 "KZG_POINT_EVALUATION": "0x000000000000000000000000000000000000000a",
1689 "MODEXP": "0x0000000000000000000000000000000000000005",
1690 "RIPEMD160": "0x0000000000000000000000000000000000000003",
1691 "SHA256": "0x0000000000000000000000000000000000000002"
1692 },
1693 "systemContracts": {
1694 "BEACON_ROOTS_ADDRESS": "0x000f3df6d732807ef1319fb7b8bb8522d0beac02"
1695 }
1696 },
1697 "next": {
1698 "activationTime": 1742999832,
1699 "blobSchedule": {
1700 "baseFeeUpdateFraction": 5007716,
1701 "max": 9,
1702 "target": 6
1703 },
1704 "chainId": "0x88bb0",
1705 "forkId": "0x0929e24e",
1706 "precompiles": {
1707 "BLAKE2F": "0x0000000000000000000000000000000000000009",
1708 "BLS12_G1ADD": "0x000000000000000000000000000000000000000b",
1709 "BLS12_G1MSM": "0x000000000000000000000000000000000000000c",
1710 "BLS12_G2ADD": "0x000000000000000000000000000000000000000d",
1711 "BLS12_G2MSM": "0x000000000000000000000000000000000000000e",
1712 "BLS12_MAP_FP2_TO_G2": "0x0000000000000000000000000000000000000011",
1713 "BLS12_MAP_FP_TO_G1": "0x0000000000000000000000000000000000000010",
1714 "BLS12_PAIRING_CHECK": "0x000000000000000000000000000000000000000f",
1715 "BN254_ADD": "0x0000000000000000000000000000000000000006",
1716 "BN254_MUL": "0x0000000000000000000000000000000000000007",
1717 "BN254_PAIRING": "0x0000000000000000000000000000000000000008",
1718 "ECREC": "0x0000000000000000000000000000000000000001",
1719 "ID": "0x0000000000000000000000000000000000000004",
1720 "KZG_POINT_EVALUATION": "0x000000000000000000000000000000000000000a",
1721 "MODEXP": "0x0000000000000000000000000000000000000005",
1722 "RIPEMD160": "0x0000000000000000000000000000000000000003",
1723 "SHA256": "0x0000000000000000000000000000000000000002"
1724 },
1725 "systemContracts": {
1726 "BEACON_ROOTS_ADDRESS": "0x000f3df6d732807ef1319fb7b8bb8522d0beac02",
1727 "CONSOLIDATION_REQUEST_PREDEPLOY_ADDRESS": "0x0000bbddc7ce488642fb579f8b00f3a590007251",
1728 "DEPOSIT_CONTRACT_ADDRESS": "0x00000000219ab540356cbb839cbe05303d7705fa",
1729 "HISTORY_STORAGE_ADDRESS": "0x0000f90827f1c53a10cb7a02335b175320002935",
1730 "WITHDRAWAL_REQUEST_PREDEPLOY_ADDRESS": "0x00000961ef480eb55e80d19ad83579a64c007002"
1731 }
1732 },
1733 "last": {
1734 "activationTime": 1762955544,
1735 "blobSchedule": {
1736 "baseFeeUpdateFraction": 11684671,
1737 "max": 21,
1738 "target": 14,
1739 },
1740 "chainId": "0x88bb0",
1741 "forkId": "0x23aa1351",
1742 "precompiles": {
1743 "BLAKE2F": "0x0000000000000000000000000000000000000009",
1744 "BLS12_G1ADD": "0x000000000000000000000000000000000000000b",
1745 "BLS12_G1MSM": "0x000000000000000000000000000000000000000c",
1746 "BLS12_G2ADD": "0x000000000000000000000000000000000000000d",
1747 "BLS12_G2MSM": "0x000000000000000000000000000000000000000e",
1748 "BLS12_MAP_FP2_TO_G2": "0x0000000000000000000000000000000000000011",
1749 "BLS12_MAP_FP_TO_G1": "0x0000000000000000000000000000000000000010",
1750 "BLS12_PAIRING_CHECK": "0x000000000000000000000000000000000000000f",
1751 "BN254_ADD": "0x0000000000000000000000000000000000000006",
1752 "BN254_MUL": "0x0000000000000000000000000000000000000007",
1753 "BN254_PAIRING": "0x0000000000000000000000000000000000000008",
1754 "ECREC": "0x0000000000000000000000000000000000000001",
1755 "ID": "0x0000000000000000000000000000000000000004",
1756 "KZG_POINT_EVALUATION": "0x000000000000000000000000000000000000000a",
1757 "MODEXP": "0x0000000000000000000000000000000000000005",
1758 "P256VERIFY":"0x0000000000000000000000000000000000000100",
1759 "RIPEMD160": "0x0000000000000000000000000000000000000003",
1760 "SHA256": "0x0000000000000000000000000000000000000002"
1761 },
1762 "systemContracts": {
1763 "BEACON_ROOTS_ADDRESS": "0x000f3df6d732807ef1319fb7b8bb8522d0beac02",
1764 "CONSOLIDATION_REQUEST_PREDEPLOY_ADDRESS": "0x0000bbddc7ce488642fb579f8b00f3a590007251",
1765 "DEPOSIT_CONTRACT_ADDRESS": "0x00000000219ab540356cbb839cbe05303d7705fa",
1766 "HISTORY_STORAGE_ADDRESS": "0x0000f90827f1c53a10cb7a02335b175320002935",
1767 "WITHDRAWAL_REQUEST_PREDEPLOY_ADDRESS": "0x00000961ef480eb55e80d19ad83579a64c007002"
1768 }
1769 },
1770 }
1771 });
1772 let expected_response = to_rpc_response_success_value(&json.to_string());
1773 assert_eq!(rpc_response.to_string(), expected_response.to_string())
1774 }
1775}