1use super::methods::MethodHandler;
45use super::types::{RpcError, RpcRequest, RpcResponse};
46use super::utils::get_default_rpc_port;
47use crate::EngineContext;
48use axum::{
49 extract::{Json as JsonExtract, State},
50 response::Json as JsonResponse,
51 routing::{get, post},
52 Router,
53};
54use eyre::Result;
55use revm::database::CacheDB;
56use revm::{Database, DatabaseCommit, DatabaseRef};
57use std::net::SocketAddr;
58use std::sync::Arc;
59use tokio::sync::oneshot;
60use tracing::{error, info, warn};
61
62#[derive(Debug)]
67pub struct RpcServerHandle {
68 pub addr: SocketAddr,
70 shutdown_tx: oneshot::Sender<()>,
72}
73
74impl RpcServerHandle {
75 pub fn addr(&self) -> SocketAddr {
77 self.addr
78 }
79
80 pub fn port(&self) -> u16 {
82 self.addr.port()
83 }
84
85 pub fn shutdown(self) -> Result<()> {
87 if self.shutdown_tx.send(()).is_err() {
88 warn!("RPC server already shut down");
89 }
90 Ok(())
91 }
92}
93
94#[derive(Clone)]
99struct RpcState<DB>
100where
101 DB: Database + DatabaseCommit + DatabaseRef + Clone + Send + Sync + 'static,
102 <CacheDB<DB> as Database>::Error: Clone + Send + Sync,
103 <DB as Database>::Error: Clone + Send + Sync,
104{
105 server: Arc<DebugRpcServer<DB>>,
107}
108
109pub struct DebugRpcServer<DB>
118where
119 DB: Database + DatabaseCommit + DatabaseRef + Clone + Send + Sync + 'static,
120 <CacheDB<DB> as Database>::Error: Clone + Send + Sync,
121 <DB as Database>::Error: Clone + Send + Sync,
122{
123 context: Arc<EngineContext<DB>>,
125 method_handler: Arc<MethodHandler<DB>>,
127}
128
129impl<DB> DebugRpcServer<DB>
130where
131 DB: Database + DatabaseCommit + DatabaseRef + Clone + Send + Sync + 'static,
132 <CacheDB<DB> as Database>::Error: Clone + Send + Sync,
133 <DB as Database>::Error: Clone + Send + Sync,
134{
135 pub fn new(context: EngineContext<DB>) -> Self {
137 let context = Arc::new(context);
138 let method_handler = Arc::new(MethodHandler::new(context.clone()));
139
140 Self { context, method_handler }
141 }
142
143 pub async fn start(self) -> Result<RpcServerHandle> {
145 let port = get_default_rpc_port()?;
146 self.start_on_port(port).await
147 }
148
149 pub async fn start_on_port(self, port: u16) -> Result<RpcServerHandle> {
154 let app = Router::new()
156 .route("/", post(handle_rpc_request))
157 .route("/health", get(health_check))
158 .with_state(RpcState { server: Arc::new(self) });
159
160 let addr = SocketAddr::from(([127, 0, 0, 1], port));
161 let listener = tokio::net::TcpListener::bind(addr).await?;
162 let actual_addr = listener.local_addr()?;
163
164 let (shutdown_tx, shutdown_rx) = oneshot::channel();
165
166 tokio::spawn(async move {
168 axum::serve(listener, app)
169 .with_graceful_shutdown(async {
170 shutdown_rx.await.ok();
171 })
172 .await
173 .expect("RPC server failed");
174 });
175
176 info!("Debug RPC server started on {}", actual_addr);
177
178 Ok(RpcServerHandle { addr: actual_addr, shutdown_tx })
179 }
180
181 async fn handle_request(&self, request: RpcRequest) -> RpcResponse {
183 let id = request.id.clone();
184
185 match self.method_handler.handle_method(&request.method, request.params).await {
187 Ok(result) => {
188 RpcResponse { jsonrpc: "2.0".to_string(), result: Some(result), error: None, id }
189 }
190 Err(err) => {
191 error!(target: "rpc", "Error handling RPC request: {:?}", err);
192 RpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(err), id }
193 }
194 }
195 }
196
197 pub fn snapshot_count(&self) -> usize {
199 self.context.snapshots.len()
200 }
201
202 pub fn validate_snapshot_index(&self, index: usize) -> Result<()> {
204 if index >= self.snapshot_count() {
205 return Err(eyre::eyre!(
206 "Snapshot index {} out of bounds (max: {})",
207 index,
208 self.snapshot_count() - 1
209 ));
210 }
211 Ok(())
212 }
213
214 pub fn context(&self) -> &Arc<EngineContext<DB>> {
216 &self.context
217 }
218}
219
220async fn handle_rpc_request<DB>(
222 State(state): State<RpcState<DB>>,
223 JsonExtract(request): JsonExtract<RpcRequest>,
224) -> JsonResponse<RpcResponse>
225where
226 DB: Database + DatabaseCommit + DatabaseRef + Clone + Send + Sync + 'static,
227 <CacheDB<DB> as Database>::Error: Clone + Send + Sync,
228 <DB as Database>::Error: Clone + Send + Sync,
229{
230 if request.jsonrpc != "2.0" {
232 return JsonResponse(RpcResponse {
233 jsonrpc: "2.0".to_string(),
234 result: None,
235 error: Some(RpcError {
236 code: -32600,
237 message: "Invalid Request - JSON-RPC version must be 2.0".to_string(),
238 data: None,
239 }),
240 id: request.id.clone(),
241 });
242 }
243
244 let response = state.server.handle_request(request).await;
246 JsonResponse(response)
247}
248
249async fn health_check() -> JsonResponse<serde_json::Value> {
251 JsonResponse(serde_json::json!({
252 "status": "healthy",
253 "service": "edb-debug-rpc-server",
254 "version": env!("CARGO_PKG_VERSION"),
255 "architecture": "multi-threaded"
256 }))
257}
258
259pub async fn start_debug_server<DB>(context: EngineContext<DB>) -> Result<RpcServerHandle>
261where
262 DB: Database + DatabaseCommit + DatabaseRef + Clone + Send + Sync + 'static,
263 <CacheDB<DB> as Database>::Error: Clone + Send + Sync,
264 <DB as Database>::Error: Clone + Send + Sync,
265{
266 let server = DebugRpcServer::new(context);
267 server.start().await
268}