starknet_devnet_server/api/
json_rpc_handler.rs

1use std::sync::Arc;
2
3use axum::extract::ws::{Message, WebSocket};
4use futures::stream::SplitSink;
5use futures::{SinkExt, StreamExt};
6use starknet_core::StarknetBlock;
7use starknet_core::starknet::starknet_config::DumpOn;
8use starknet_types::emitted_event::SubscriptionEmittedEvent;
9use starknet_types::rpc::block::{BlockId, BlockTag, ReorgData};
10use starknet_types::rpc::transactions::TransactionFinalityStatus;
11use tokio::sync::Mutex;
12use tracing::{info, trace};
13
14use crate::api::models::{
15    AccountAddressInput, BlockAndClassHashInput, BlockAndContractAddressInput, BlockAndIndexInput,
16    BlockIdInput, BroadcastedDeclareTransactionEnumWrapper, BroadcastedDeclareTransactionInput,
17    BroadcastedDeployAccountTransactionEnumWrapper, BroadcastedDeployAccountTransactionInput,
18    BroadcastedInvokeTransactionEnumWrapper, BroadcastedInvokeTransactionInput, CallInput,
19    ClassHashInput, DevnetSpecRequest, EstimateFeeInput, EventsInput, GetStorageInput,
20    JsonRpcRequest, JsonRpcResponse, JsonRpcWsRequest, LoadPath, SimulateTransactionsInput,
21    StarknetSpecRequest, ToRpcResponseResult, TransactionHashInput, to_json_rpc_request,
22};
23use crate::api::origin_forwarder::OriginForwarder;
24use crate::api::{Api, ApiError, error};
25use crate::dump_util::dump_event;
26use crate::restrictive_mode::is_json_rpc_method_restricted;
27use crate::rpc_core;
28use crate::rpc_core::error::{ErrorCode, RpcError};
29use crate::rpc_core::request::RpcMethodCall;
30use crate::rpc_core::response::{ResponseResult, RpcResponse};
31use crate::rpc_handler::RpcHandler;
32use crate::subscribe::{
33    NewTransactionNotification, NewTransactionReceiptNotification, NewTransactionStatus,
34    NotificationData, SocketId,
35};
36
37/// This object will be used as a shared state between HTTP calls.
38/// Is similar to the HttpApiHandler but is with extended functionality and is used for JSON-RPC
39/// methods
40#[derive(Clone)]
41pub struct JsonRpcHandler {
42    pub api: Api,
43    pub origin_caller: Option<OriginForwarder>,
44}
45
46#[async_trait::async_trait]
47impl RpcHandler for JsonRpcHandler {
48    type Request = JsonRpcRequest;
49
50    async fn on_request(
51        &self,
52        request: Self::Request,
53        original_call: RpcMethodCall,
54    ) -> ResponseResult {
55        info!(target: "rpc", "received method in on_request {}", request);
56
57        if !self.allows_method(&original_call.method) {
58            return ResponseResult::Error(RpcError::new(ErrorCode::MethodForbidden));
59        }
60
61        let is_request_forwardable = request.is_forwardable_to_origin(); // applicable if forking
62        let is_request_dumpable = request.is_dumpable();
63
64        // for later comparison and subscription notifications
65        let old_latest_block = if request.requires_notifying() {
66            Some(self.get_block_by_tag(BlockTag::Latest).await)
67        } else {
68            None
69        };
70
71        let old_pre_confirmed_block =
72            if request.requires_notifying() && self.api.config.uses_pre_confirmed_block() {
73                Some(self.get_block_by_tag(BlockTag::PreConfirmed).await)
74            } else {
75                None
76            };
77
78        let starknet_resp = self.execute(request).await;
79
80        // If locally we got an error and forking is set up, forward the request to the origin
81        if let (Err(err), Some(forwarder)) = (&starknet_resp, &self.origin_caller) {
82            if err.is_forwardable_to_origin() && is_request_forwardable {
83                // if a block or state is requested that was only added to origin after
84                // forking happened, it will be normally returned; we don't extra-handle this case
85                return forwarder.call(&original_call).await;
86            }
87        }
88
89        if starknet_resp.is_ok() && is_request_dumpable {
90            if let Err(e) = self.update_dump(&original_call).await {
91                return ResponseResult::Error(e);
92            }
93        }
94
95        if let Err(e) = self.broadcast_changes(old_latest_block, old_pre_confirmed_block).await {
96            return ResponseResult::Error(e.api_error_to_rpc_error());
97        }
98
99        starknet_resp.to_rpc_result()
100    }
101
102    async fn on_call(&self, call: RpcMethodCall) -> RpcResponse {
103        let id = call.id.clone();
104        let method = call.method.clone();
105        trace!(target: "rpc",  id = ?id, method = ?method, "received method call");
106
107        let timer = std::time::Instant::now();
108
109        let response = match to_json_rpc_request(&call) {
110            Ok(req) => {
111                let result = self.on_request(req, call).await;
112                RpcResponse::new(id, result)
113            }
114            Err(e) => RpcResponse::from_rpc_error(e, id),
115        };
116
117        // Record metrics
118        let duration = timer.elapsed().as_secs_f64();
119        let status = match &response.result {
120            crate::rpc_core::response::ResponseResult::Success(_) => "success",
121            crate::rpc_core::response::ResponseResult::Error(_) => "error",
122        };
123
124        crate::metrics::RPC_CALL_DURATION.with_label_values(&[&method]).observe(duration);
125        crate::metrics::RPC_CALL_COUNT.with_label_values(&[&method, status]).inc();
126
127        response
128    }
129
130    async fn on_websocket(&self, socket: WebSocket) {
131        let (socket_writer, mut socket_reader) = socket.split();
132        let socket_writer = Arc::new(Mutex::new(socket_writer));
133
134        let socket_id = self.api.sockets.lock().await.insert(socket_writer.clone());
135
136        // listen to new messages coming through the socket
137        let mut socket_safely_closed = false;
138        while let Some(msg) = socket_reader.next().await {
139            match msg {
140                Ok(Message::Text(text)) => {
141                    self.on_websocket_call(text.as_bytes(), socket_writer.clone(), socket_id).await;
142                }
143                Ok(Message::Binary(bytes)) => {
144                    self.on_websocket_call(&bytes, socket_writer.clone(), socket_id).await;
145                }
146                Ok(Message::Close(_)) => {
147                    socket_safely_closed = true;
148                    break;
149                }
150                other => {
151                    tracing::error!("Socket handler got an unexpected message: {other:?}")
152                }
153            }
154        }
155
156        self.api.sockets.lock().await.remove(&socket_id);
157        if socket_safely_closed {
158            tracing::info!("Websocket disconnected");
159        } else {
160            tracing::error!("Failed socket read");
161        }
162    }
163}
164
165impl JsonRpcHandler {
166    pub fn new(api: Api) -> JsonRpcHandler {
167        let origin_caller = if let (Some(url), Some(block_number)) =
168            (&api.config.fork_config.url, api.config.fork_config.block_number)
169        {
170            Some(OriginForwarder::new(url.clone(), block_number))
171        } else {
172            None
173        };
174
175        JsonRpcHandler { api, origin_caller }
176    }
177
178    /// The latest and pre_confirmed block are always defined, so to avoid having to deal with
179    /// Err/None in places where this method is called, it is defined to return an empty
180    /// accepted block, even though that case should never happen.
181    async fn get_block_by_tag(&self, tag: BlockTag) -> StarknetBlock {
182        let starknet = self.api.starknet.lock().await;
183        match starknet.get_block(&BlockId::Tag(tag)) {
184            Ok(block) => block.clone(),
185            _ => StarknetBlock::create_empty_accepted(),
186        }
187    }
188
189    async fn broadcast_pre_confirmed_tx_changes(
190        &self,
191        old_pre_confirmed_block: StarknetBlock,
192    ) -> Result<(), error::ApiError> {
193        let new_pre_confirmed_block = self.get_block_by_tag(BlockTag::PreConfirmed).await;
194        let old_pre_confirmed_txs = old_pre_confirmed_block.get_transactions();
195        let new_pre_confirmed_txs = new_pre_confirmed_block.get_transactions();
196
197        if new_pre_confirmed_txs.len() > old_pre_confirmed_txs.len() {
198            #[allow(clippy::expect_used)]
199            let new_tx_hash = new_pre_confirmed_txs.last().expect("has at least one element");
200
201            let mut notifications = vec![];
202            let starknet = self.api.starknet.lock().await;
203
204            let status = starknet
205                .get_transaction_execution_and_finality_status(*new_tx_hash)
206                .map_err(error::ApiError::StarknetDevnetError)?;
207            notifications.push(NotificationData::TransactionStatus(NewTransactionStatus {
208                transaction_hash: *new_tx_hash,
209                status,
210            }));
211
212            let tx = starknet
213                .get_transaction_by_hash(*new_tx_hash)
214                .map_err(error::ApiError::StarknetDevnetError)?;
215            notifications.push(NotificationData::NewTransaction(NewTransactionNotification {
216                tx: tx.clone(),
217                finality_status: TransactionFinalityStatus::PreConfirmed,
218            }));
219
220            let receipt = starknet
221                .get_transaction_receipt_by_hash(new_tx_hash)
222                .map_err(error::ApiError::StarknetDevnetError)?;
223
224            notifications.push(NotificationData::NewTransactionReceipt(
225                NewTransactionReceiptNotification {
226                    tx_receipt: receipt,
227                    sender_address: tx.get_sender_address(),
228                },
229            ));
230
231            let events = starknet.get_unlimited_events(
232                Some(BlockId::Tag(BlockTag::PreConfirmed)),
233                Some(BlockId::Tag(BlockTag::PreConfirmed)),
234                None,
235                None,
236                None, // pre-confirmed block only has pre-confirmed txs
237            )?;
238
239            drop(starknet); // Drop immediately after last use
240
241            for emitted_event in events.into_iter().filter(|e| &e.transaction_hash == new_tx_hash) {
242                notifications.push(NotificationData::Event(SubscriptionEmittedEvent {
243                    emitted_event,
244                    finality_status: TransactionFinalityStatus::PreConfirmed,
245                }));
246            }
247
248            self.api.sockets.lock().await.notify_subscribers(&notifications).await;
249        }
250
251        Ok(())
252    }
253
254    async fn broadcast_latest_changes(
255        &self,
256        new_latest_block: StarknetBlock,
257    ) -> Result<(), error::ApiError> {
258        let block_header = (&new_latest_block).into();
259        let mut notifications = vec![NotificationData::NewHeads(block_header)];
260
261        let starknet = self.api.starknet.lock().await;
262
263        let finality_status = TransactionFinalityStatus::AcceptedOnL2;
264        let latest_txs = new_latest_block.get_transactions();
265        for tx_hash in latest_txs {
266            let tx = starknet
267                .get_transaction_by_hash(*tx_hash)
268                .map_err(error::ApiError::StarknetDevnetError)?;
269            notifications.push(NotificationData::NewTransaction(NewTransactionNotification {
270                tx: tx.clone(),
271                finality_status,
272            }));
273
274            let status = starknet
275                .get_transaction_execution_and_finality_status(*tx_hash)
276                .map_err(error::ApiError::StarknetDevnetError)?;
277            notifications.push(NotificationData::TransactionStatus(NewTransactionStatus {
278                transaction_hash: *tx_hash,
279                status,
280            }));
281
282            let tx_receipt = starknet
283                .get_transaction_receipt_by_hash(tx_hash)
284                .map_err(error::ApiError::StarknetDevnetError)?;
285            notifications.push(NotificationData::NewTransactionReceipt(
286                NewTransactionReceiptNotification {
287                    tx_receipt,
288                    sender_address: tx.get_sender_address(),
289                },
290            ));
291        }
292
293        let events = starknet.get_unlimited_events(
294            Some(BlockId::Tag(BlockTag::Latest)),
295            Some(BlockId::Tag(BlockTag::Latest)),
296            None,
297            None,
298            None, // latest block only has txs accepted on L2
299        )?;
300
301        drop(starknet); // Drop immediately after last use
302
303        for emitted_event in events {
304            notifications.push(NotificationData::Event(SubscriptionEmittedEvent {
305                emitted_event,
306                finality_status,
307            }));
308        }
309
310        self.api.sockets.lock().await.notify_subscribers(&notifications).await;
311        Ok(())
312    }
313
314    /// Notify subscribers of what they are subscribed to.
315    async fn broadcast_changes(
316        &self,
317        old_latest_block: Option<StarknetBlock>,
318        old_pre_confirmed_block: Option<StarknetBlock>,
319    ) -> Result<(), error::ApiError> {
320        let Some(old_latest_block) = old_latest_block else {
321            return Ok(());
322        };
323
324        if let Some(old_pre_confirmed_block) = old_pre_confirmed_block {
325            self.broadcast_pre_confirmed_tx_changes(old_pre_confirmed_block).await?;
326        }
327
328        let new_latest_block = self.get_block_by_tag(BlockTag::Latest).await;
329
330        match new_latest_block.block_number().cmp(&old_latest_block.block_number()) {
331            std::cmp::Ordering::Less => {
332                self.broadcast_reorg(old_latest_block, new_latest_block).await?
333            }
334            std::cmp::Ordering::Equal => { /* no changes required */ }
335            std::cmp::Ordering::Greater => self.broadcast_latest_changes(new_latest_block).await?,
336        }
337
338        Ok(())
339    }
340
341    async fn broadcast_reorg(
342        &self,
343        old_latest_block: StarknetBlock,
344        new_latest_block: StarknetBlock,
345    ) -> Result<(), ApiError> {
346        let last_aborted_block_hash =
347            *self.api.starknet.lock().await.last_aborted_block_hash().ok_or(
348                ApiError::StarknetDevnetError(
349                    starknet_core::error::Error::UnexpectedInternalError {
350                        msg: "Aborted block hash should be defined.".into(),
351                    },
352                ),
353            )?;
354
355        let notification = NotificationData::Reorg(ReorgData {
356            starting_block_hash: last_aborted_block_hash,
357            starting_block_number: new_latest_block.block_number().unchecked_next(),
358            ending_block_hash: old_latest_block.block_hash(),
359            ending_block_number: old_latest_block.block_number(),
360        });
361
362        self.api.sockets.lock().await.notify_subscribers(&[notification]).await;
363        Ok(())
364    }
365
366    /// Matches the request to the corresponding enum variant and executes the request.
367    async fn execute(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, error::ApiError> {
368        trace!(target: "JsonRpcHandler::execute", "executing request");
369        match req {
370            JsonRpcRequest::StarknetSpecRequest(req) => self.execute_starknet_spec(req).await,
371            JsonRpcRequest::DevnetSpecRequest(req) => self.execute_devnet_spec(req).await,
372        }
373    }
374
375    async fn execute_starknet_spec(
376        &self,
377        req: StarknetSpecRequest,
378    ) -> Result<JsonRpcResponse, error::ApiError> {
379        match req {
380            StarknetSpecRequest::SpecVersion => self.spec_version(),
381            StarknetSpecRequest::BlockWithTransactionHashes(block) => {
382                self.get_block_with_tx_hashes(block.block_id).await
383            }
384            StarknetSpecRequest::BlockWithFullTransactions(block) => {
385                self.get_block_with_txs(block.block_id).await
386            }
387            StarknetSpecRequest::BlockWithReceipts(block) => {
388                self.get_block_with_receipts(block.block_id).await
389            }
390            StarknetSpecRequest::StateUpdate(block) => self.get_state_update(block.block_id).await,
391            StarknetSpecRequest::StorageAt(GetStorageInput { contract_address, key, block_id }) => {
392                self.get_storage_at(contract_address, key, block_id).await
393            }
394            StarknetSpecRequest::TransactionStatusByHash(TransactionHashInput {
395                transaction_hash,
396            }) => self.get_transaction_status_by_hash(transaction_hash).await,
397            StarknetSpecRequest::TransactionByHash(TransactionHashInput { transaction_hash }) => {
398                self.get_transaction_by_hash(transaction_hash).await
399            }
400            StarknetSpecRequest::TransactionByBlockAndIndex(BlockAndIndexInput {
401                block_id,
402                index,
403            }) => self.get_transaction_by_block_id_and_index(block_id, index).await,
404            StarknetSpecRequest::TransactionReceiptByTransactionHash(TransactionHashInput {
405                transaction_hash,
406            }) => self.get_transaction_receipt_by_hash(transaction_hash).await,
407            StarknetSpecRequest::ClassByHash(BlockAndClassHashInput { block_id, class_hash }) => {
408                self.get_class(block_id, class_hash).await
409            }
410            StarknetSpecRequest::CompiledCasmByClassHash(ClassHashInput { class_hash }) => {
411                self.get_compiled_casm(class_hash).await
412            }
413            StarknetSpecRequest::ClassHashAtContractAddress(BlockAndContractAddressInput {
414                block_id,
415                contract_address,
416            }) => self.get_class_hash_at(block_id, contract_address).await,
417            StarknetSpecRequest::ClassAtContractAddress(BlockAndContractAddressInput {
418                block_id,
419                contract_address,
420            }) => self.get_class_at(block_id, contract_address).await,
421            StarknetSpecRequest::BlockTransactionCount(block) => {
422                self.get_block_txs_count(block.block_id).await
423            }
424            StarknetSpecRequest::Call(CallInput { request, block_id }) => {
425                self.call(block_id, request).await
426            }
427            StarknetSpecRequest::EstimateFee(EstimateFeeInput {
428                request,
429                block_id,
430                simulation_flags,
431            }) => self.estimate_fee(block_id, request, simulation_flags).await,
432            StarknetSpecRequest::BlockNumber => self.block_number().await,
433            StarknetSpecRequest::BlockHashAndNumber => self.block_hash_and_number().await,
434            StarknetSpecRequest::ChainId => self.chain_id().await,
435            StarknetSpecRequest::Syncing => self.syncing().await,
436            StarknetSpecRequest::Events(EventsInput { filter }) => self.get_events(filter).await,
437            StarknetSpecRequest::ContractNonce(BlockAndContractAddressInput {
438                block_id,
439                contract_address,
440            }) => self.get_nonce(block_id, contract_address).await,
441            StarknetSpecRequest::AddDeclareTransaction(BroadcastedDeclareTransactionInput {
442                declare_transaction,
443            }) => {
444                let BroadcastedDeclareTransactionEnumWrapper::Declare(broadcasted_transaction) =
445                    declare_transaction;
446                self.add_declare_transaction(broadcasted_transaction).await
447            }
448            StarknetSpecRequest::AddDeployAccountTransaction(
449                BroadcastedDeployAccountTransactionInput { deploy_account_transaction },
450            ) => {
451                let BroadcastedDeployAccountTransactionEnumWrapper::DeployAccount(
452                    broadcasted_transaction,
453                ) = deploy_account_transaction;
454                self.add_deploy_account_transaction(broadcasted_transaction).await
455            }
456            StarknetSpecRequest::AddInvokeTransaction(BroadcastedInvokeTransactionInput {
457                invoke_transaction,
458            }) => {
459                let BroadcastedInvokeTransactionEnumWrapper::Invoke(broadcasted_transaction) =
460                    invoke_transaction;
461                self.add_invoke_transaction(broadcasted_transaction).await
462            }
463            StarknetSpecRequest::EstimateMessageFee(request) => {
464                self.estimate_message_fee(request.get_block_id(), request.get_raw_message().clone())
465                    .await
466            }
467            StarknetSpecRequest::SimulateTransactions(SimulateTransactionsInput {
468                block_id,
469                transactions,
470                simulation_flags,
471            }) => self.simulate_transactions(block_id, transactions, simulation_flags).await,
472            StarknetSpecRequest::TraceTransaction(TransactionHashInput { transaction_hash }) => {
473                self.get_trace_transaction(transaction_hash).await
474            }
475            StarknetSpecRequest::BlockTransactionTraces(BlockIdInput { block_id }) => {
476                self.get_trace_block_transactions(block_id).await
477            }
478            StarknetSpecRequest::MessagesStatusByL1Hash(data) => {
479                self.get_messages_status(data).await
480            }
481            StarknetSpecRequest::StorageProof(data) => self.get_storage_proof(data).await,
482        }
483    }
484
485    async fn execute_devnet_spec(
486        &self,
487        req: DevnetSpecRequest,
488    ) -> Result<JsonRpcResponse, error::ApiError> {
489        match req {
490            DevnetSpecRequest::ImpersonateAccount(AccountAddressInput { account_address }) => {
491                self.impersonate_account(account_address).await
492            }
493            DevnetSpecRequest::StopImpersonateAccount(AccountAddressInput { account_address }) => {
494                self.stop_impersonating_account(account_address).await
495            }
496            DevnetSpecRequest::AutoImpersonate => self.set_auto_impersonate(true).await,
497            DevnetSpecRequest::StopAutoImpersonate => self.set_auto_impersonate(false).await,
498            DevnetSpecRequest::Dump(path) => self.dump(path).await,
499            DevnetSpecRequest::Load(LoadPath { path }) => self.load(path).await,
500            DevnetSpecRequest::PostmanLoadL1MessagingContract(data) => {
501                self.postman_load(data).await
502            }
503            DevnetSpecRequest::PostmanFlush(data) => self.postman_flush(data).await,
504            DevnetSpecRequest::PostmanSendMessageToL2(message) => {
505                self.postman_send_message_to_l2(message).await
506            }
507            DevnetSpecRequest::PostmanConsumeMessageFromL2(message) => {
508                self.postman_consume_message_from_l2(message).await
509            }
510            DevnetSpecRequest::CreateBlock => self.create_block().await,
511            DevnetSpecRequest::AbortBlocks(data) => self.abort_blocks(data).await,
512            DevnetSpecRequest::AcceptOnL1(data) => self.accept_on_l1(data).await,
513            DevnetSpecRequest::SetGasPrice(data) => self.set_gas_price(data).await,
514            DevnetSpecRequest::Restart(data) => self.restart(data).await,
515            DevnetSpecRequest::SetTime(data) => self.set_time(data).await,
516            DevnetSpecRequest::IncreaseTime(data) => self.increase_time(data).await,
517            DevnetSpecRequest::PredeployedAccounts(data) => {
518                self.get_predeployed_accounts(data).await
519            }
520            DevnetSpecRequest::AccountBalance(data) => self.get_account_balance(data).await,
521            DevnetSpecRequest::Mint(data) => self.mint(data).await,
522            DevnetSpecRequest::DevnetConfig => self.get_devnet_config().await,
523        }
524    }
525
526    /// Takes `bytes` to be an encoded RPC call, executes it, and sends the response back via `ws`.
527    async fn on_websocket_call(
528        &self,
529        bytes: &[u8],
530        ws: Arc<Mutex<SplitSink<WebSocket, Message>>>,
531        socket_id: SocketId,
532    ) {
533        let error_serialized = match serde_json::from_slice(bytes) {
534            Ok(rpc_call) => match self.on_websocket_rpc_call(&rpc_call, socket_id).await {
535                Ok(_) => return,
536                Err(e) => serde_json::to_string(&RpcResponse::from_rpc_error(e, rpc_call.id))
537                    .unwrap_or_default(),
538            },
539            Err(e) => serde_json::to_string(&RpcResponse::from_rpc_error(
540                RpcError::parse_error(e.to_string()),
541                rpc_core::request::Id::Null,
542            ))
543            .unwrap_or_default(),
544        };
545
546        if let Err(e) = ws.lock().await.send(Message::Text(error_serialized.into())).await {
547            tracing::error!("Error sending websocket message: {e}");
548        }
549    }
550
551    fn allows_method(&self, method: &str) -> bool {
552        if let Some(restricted_methods) = &self.api.server_config.restricted_methods {
553            if is_json_rpc_method_restricted(method, restricted_methods) {
554                return false;
555            }
556        }
557
558        true
559    }
560
561    /// Since some subscriptions might need to send multiple messages, sending messages other than
562    /// errors is left to individual RPC method handlers and this method returns an empty successful
563    /// Result. A one-time request also returns an empty successful result, but actually sends the
564    /// message.
565    async fn on_websocket_rpc_call(
566        &self,
567        call: &RpcMethodCall,
568        socket_id: SocketId,
569    ) -> Result<(), RpcError> {
570        trace!(target: "rpc",  id = ?call.id, method = ?call.method, "received websocket call");
571
572        let timer = std::time::Instant::now();
573        let method = call.method.clone();
574
575        let req: JsonRpcWsRequest = to_json_rpc_request(call)?;
576        let result = match req {
577            JsonRpcWsRequest::OneTimeRequest(req) => {
578                let resp_result = self.on_request(*req, call.clone()).await;
579                let mut sockets = self.api.sockets.lock().await;
580
581                let socket_context =
582                    sockets.get_mut(&socket_id).map_err(|e| e.api_error_to_rpc_error())?;
583
584                match resp_result {
585                    ResponseResult::Success(result_value) => {
586                        socket_context.send_rpc_response(result_value, call.id.clone()).await;
587                        Ok(())
588                    }
589                    ResponseResult::Error(rpc_error) => Err(rpc_error),
590                }
591            }
592            JsonRpcWsRequest::SubscriptionRequest(req) => self
593                .execute_ws_subscription(req, call.id.clone(), socket_id)
594                .await
595                .map_err(|e| e.api_error_to_rpc_error()),
596        };
597
598        // Record metrics
599        let duration = timer.elapsed().as_secs_f64();
600        let status = if result.is_ok() { "success" } else { "error" };
601        crate::metrics::RPC_CALL_DURATION.with_label_values(&[&method]).observe(duration);
602        crate::metrics::RPC_CALL_COUNT.with_label_values(&[&method, status]).inc();
603
604        result
605    }
606
607    async fn update_dump(&self, event: &RpcMethodCall) -> Result<(), RpcError> {
608        match self.api.config.dump_on {
609            Some(DumpOn::Block) => {
610                let dump_path = self
611                    .api
612                    .config
613                    .dump_path
614                    .as_deref()
615                    .ok_or(RpcError::internal_error_with("Undefined dump_path"))?;
616
617                dump_event(event, dump_path).map_err(|e| {
618                    let msg = format!("Failed dumping of {}: {e}", event.method);
619                    RpcError::internal_error_with(msg)
620                })?;
621            }
622            Some(DumpOn::Request | DumpOn::Exit) => {
623                self.api.dumpable_events.lock().await.push(event.clone())
624            }
625            None => (),
626        }
627
628        Ok(())
629    }
630
631    pub async fn re_execute(&self, events: &[RpcMethodCall]) -> Result<(), RpcError> {
632        for event in events {
633            if let ResponseResult::Error(e) = self.on_call(event.clone()).await.result {
634                return Err(e);
635            }
636        }
637        Ok(())
638    }
639}