1use std::sync::Arc;
2
3use axum::extract::ws::{Message, WebSocket};
4use futures::stream::SplitSink;
5use futures::{SinkExt, StreamExt};
6use starknet_core::StarknetBlock;
7use starknet_core::starknet::starknet_config::DumpOn;
8use starknet_types::emitted_event::SubscriptionEmittedEvent;
9use starknet_types::rpc::block::{BlockId, BlockTag, ReorgData};
10use starknet_types::rpc::transactions::TransactionFinalityStatus;
11use tokio::sync::Mutex;
12use tracing::{info, trace};
13
14use crate::api::models::{
15 AccountAddressInput, BlockAndClassHashInput, BlockAndContractAddressInput, BlockAndIndexInput,
16 BlockIdInput, BroadcastedDeclareTransactionEnumWrapper, BroadcastedDeclareTransactionInput,
17 BroadcastedDeployAccountTransactionEnumWrapper, BroadcastedDeployAccountTransactionInput,
18 BroadcastedInvokeTransactionEnumWrapper, BroadcastedInvokeTransactionInput, CallInput,
19 ClassHashInput, DevnetSpecRequest, EstimateFeeInput, EventsInput, GetStorageInput,
20 JsonRpcRequest, JsonRpcResponse, JsonRpcWsRequest, LoadPath, SimulateTransactionsInput,
21 StarknetSpecRequest, ToRpcResponseResult, TransactionHashInput, to_json_rpc_request,
22};
23use crate::api::origin_forwarder::OriginForwarder;
24use crate::api::{Api, ApiError, error};
25use crate::dump_util::dump_event;
26use crate::restrictive_mode::is_json_rpc_method_restricted;
27use crate::rpc_core;
28use crate::rpc_core::error::{ErrorCode, RpcError};
29use crate::rpc_core::request::RpcMethodCall;
30use crate::rpc_core::response::{ResponseResult, RpcResponse};
31use crate::rpc_handler::RpcHandler;
32use crate::subscribe::{
33 NewTransactionNotification, NewTransactionReceiptNotification, NewTransactionStatus,
34 NotificationData, SocketId,
35};
36
37#[derive(Clone)]
41pub struct JsonRpcHandler {
42 pub api: Api,
43 pub origin_caller: Option<OriginForwarder>,
44}
45
46#[async_trait::async_trait]
47impl RpcHandler for JsonRpcHandler {
48 type Request = JsonRpcRequest;
49
50 async fn on_request(
51 &self,
52 request: Self::Request,
53 original_call: RpcMethodCall,
54 ) -> ResponseResult {
55 info!(target: "rpc", "received method in on_request {}", request);
56
57 if !self.allows_method(&original_call.method) {
58 return ResponseResult::Error(RpcError::new(ErrorCode::MethodForbidden));
59 }
60
61 let is_request_forwardable = request.is_forwardable_to_origin(); let is_request_dumpable = request.is_dumpable();
63
64 let old_latest_block = if request.requires_notifying() {
66 Some(self.get_block_by_tag(BlockTag::Latest).await)
67 } else {
68 None
69 };
70
71 let old_pre_confirmed_block =
72 if request.requires_notifying() && self.api.config.uses_pre_confirmed_block() {
73 Some(self.get_block_by_tag(BlockTag::PreConfirmed).await)
74 } else {
75 None
76 };
77
78 let starknet_resp = self.execute(request).await;
79
80 if let (Err(err), Some(forwarder)) = (&starknet_resp, &self.origin_caller) {
82 if err.is_forwardable_to_origin() && is_request_forwardable {
83 return forwarder.call(&original_call).await;
86 }
87 }
88
89 if starknet_resp.is_ok() && is_request_dumpable {
90 if let Err(e) = self.update_dump(&original_call).await {
91 return ResponseResult::Error(e);
92 }
93 }
94
95 if let Err(e) = self.broadcast_changes(old_latest_block, old_pre_confirmed_block).await {
96 return ResponseResult::Error(e.api_error_to_rpc_error());
97 }
98
99 starknet_resp.to_rpc_result()
100 }
101
102 async fn on_call(&self, call: RpcMethodCall) -> RpcResponse {
103 let id = call.id.clone();
104 let method = call.method.clone();
105 trace!(target: "rpc", id = ?id, method = ?method, "received method call");
106
107 let timer = std::time::Instant::now();
108
109 let response = match to_json_rpc_request(&call) {
110 Ok(req) => {
111 let result = self.on_request(req, call).await;
112 RpcResponse::new(id, result)
113 }
114 Err(e) => RpcResponse::from_rpc_error(e, id),
115 };
116
117 let duration = timer.elapsed().as_secs_f64();
119 let status = match &response.result {
120 crate::rpc_core::response::ResponseResult::Success(_) => "success",
121 crate::rpc_core::response::ResponseResult::Error(_) => "error",
122 };
123
124 crate::metrics::RPC_CALL_DURATION.with_label_values(&[&method]).observe(duration);
125 crate::metrics::RPC_CALL_COUNT.with_label_values(&[&method, status]).inc();
126
127 response
128 }
129
130 async fn on_websocket(&self, socket: WebSocket) {
131 let (socket_writer, mut socket_reader) = socket.split();
132 let socket_writer = Arc::new(Mutex::new(socket_writer));
133
134 let socket_id = self.api.sockets.lock().await.insert(socket_writer.clone());
135
136 let mut socket_safely_closed = false;
138 while let Some(msg) = socket_reader.next().await {
139 match msg {
140 Ok(Message::Text(text)) => {
141 self.on_websocket_call(text.as_bytes(), socket_writer.clone(), socket_id).await;
142 }
143 Ok(Message::Binary(bytes)) => {
144 self.on_websocket_call(&bytes, socket_writer.clone(), socket_id).await;
145 }
146 Ok(Message::Close(_)) => {
147 socket_safely_closed = true;
148 break;
149 }
150 other => {
151 tracing::error!("Socket handler got an unexpected message: {other:?}")
152 }
153 }
154 }
155
156 self.api.sockets.lock().await.remove(&socket_id);
157 if socket_safely_closed {
158 tracing::info!("Websocket disconnected");
159 } else {
160 tracing::error!("Failed socket read");
161 }
162 }
163}
164
165impl JsonRpcHandler {
166 pub fn new(api: Api) -> JsonRpcHandler {
167 let origin_caller = if let (Some(url), Some(block_number)) =
168 (&api.config.fork_config.url, api.config.fork_config.block_number)
169 {
170 Some(OriginForwarder::new(url.clone(), block_number))
171 } else {
172 None
173 };
174
175 JsonRpcHandler { api, origin_caller }
176 }
177
178 async fn get_block_by_tag(&self, tag: BlockTag) -> StarknetBlock {
182 let starknet = self.api.starknet.lock().await;
183 match starknet.get_block(&BlockId::Tag(tag)) {
184 Ok(block) => block.clone(),
185 _ => StarknetBlock::create_empty_accepted(),
186 }
187 }
188
189 async fn broadcast_pre_confirmed_tx_changes(
190 &self,
191 old_pre_confirmed_block: StarknetBlock,
192 ) -> Result<(), error::ApiError> {
193 let new_pre_confirmed_block = self.get_block_by_tag(BlockTag::PreConfirmed).await;
194 let old_pre_confirmed_txs = old_pre_confirmed_block.get_transactions();
195 let new_pre_confirmed_txs = new_pre_confirmed_block.get_transactions();
196
197 if new_pre_confirmed_txs.len() > old_pre_confirmed_txs.len() {
198 #[allow(clippy::expect_used)]
199 let new_tx_hash = new_pre_confirmed_txs.last().expect("has at least one element");
200
201 let mut notifications = vec![];
202 let starknet = self.api.starknet.lock().await;
203
204 let status = starknet
205 .get_transaction_execution_and_finality_status(*new_tx_hash)
206 .map_err(error::ApiError::StarknetDevnetError)?;
207 notifications.push(NotificationData::TransactionStatus(NewTransactionStatus {
208 transaction_hash: *new_tx_hash,
209 status,
210 }));
211
212 let tx = starknet
213 .get_transaction_by_hash(*new_tx_hash)
214 .map_err(error::ApiError::StarknetDevnetError)?;
215 notifications.push(NotificationData::NewTransaction(NewTransactionNotification {
216 tx: tx.clone(),
217 finality_status: TransactionFinalityStatus::PreConfirmed,
218 }));
219
220 let receipt = starknet
221 .get_transaction_receipt_by_hash(new_tx_hash)
222 .map_err(error::ApiError::StarknetDevnetError)?;
223
224 notifications.push(NotificationData::NewTransactionReceipt(
225 NewTransactionReceiptNotification {
226 tx_receipt: receipt,
227 sender_address: tx.get_sender_address(),
228 },
229 ));
230
231 let events = starknet.get_unlimited_events(
232 Some(BlockId::Tag(BlockTag::PreConfirmed)),
233 Some(BlockId::Tag(BlockTag::PreConfirmed)),
234 None,
235 None,
236 None, )?;
238
239 drop(starknet); for emitted_event in events.into_iter().filter(|e| &e.transaction_hash == new_tx_hash) {
242 notifications.push(NotificationData::Event(SubscriptionEmittedEvent {
243 emitted_event,
244 finality_status: TransactionFinalityStatus::PreConfirmed,
245 }));
246 }
247
248 self.api.sockets.lock().await.notify_subscribers(¬ifications).await;
249 }
250
251 Ok(())
252 }
253
254 async fn broadcast_latest_changes(
255 &self,
256 new_latest_block: StarknetBlock,
257 ) -> Result<(), error::ApiError> {
258 let block_header = (&new_latest_block).into();
259 let mut notifications = vec![NotificationData::NewHeads(block_header)];
260
261 let starknet = self.api.starknet.lock().await;
262
263 let finality_status = TransactionFinalityStatus::AcceptedOnL2;
264 let latest_txs = new_latest_block.get_transactions();
265 for tx_hash in latest_txs {
266 let tx = starknet
267 .get_transaction_by_hash(*tx_hash)
268 .map_err(error::ApiError::StarknetDevnetError)?;
269 notifications.push(NotificationData::NewTransaction(NewTransactionNotification {
270 tx: tx.clone(),
271 finality_status,
272 }));
273
274 let status = starknet
275 .get_transaction_execution_and_finality_status(*tx_hash)
276 .map_err(error::ApiError::StarknetDevnetError)?;
277 notifications.push(NotificationData::TransactionStatus(NewTransactionStatus {
278 transaction_hash: *tx_hash,
279 status,
280 }));
281
282 let tx_receipt = starknet
283 .get_transaction_receipt_by_hash(tx_hash)
284 .map_err(error::ApiError::StarknetDevnetError)?;
285 notifications.push(NotificationData::NewTransactionReceipt(
286 NewTransactionReceiptNotification {
287 tx_receipt,
288 sender_address: tx.get_sender_address(),
289 },
290 ));
291 }
292
293 let events = starknet.get_unlimited_events(
294 Some(BlockId::Tag(BlockTag::Latest)),
295 Some(BlockId::Tag(BlockTag::Latest)),
296 None,
297 None,
298 None, )?;
300
301 drop(starknet); for emitted_event in events {
304 notifications.push(NotificationData::Event(SubscriptionEmittedEvent {
305 emitted_event,
306 finality_status,
307 }));
308 }
309
310 self.api.sockets.lock().await.notify_subscribers(¬ifications).await;
311 Ok(())
312 }
313
314 async fn broadcast_changes(
316 &self,
317 old_latest_block: Option<StarknetBlock>,
318 old_pre_confirmed_block: Option<StarknetBlock>,
319 ) -> Result<(), error::ApiError> {
320 let Some(old_latest_block) = old_latest_block else {
321 return Ok(());
322 };
323
324 if let Some(old_pre_confirmed_block) = old_pre_confirmed_block {
325 self.broadcast_pre_confirmed_tx_changes(old_pre_confirmed_block).await?;
326 }
327
328 let new_latest_block = self.get_block_by_tag(BlockTag::Latest).await;
329
330 match new_latest_block.block_number().cmp(&old_latest_block.block_number()) {
331 std::cmp::Ordering::Less => {
332 self.broadcast_reorg(old_latest_block, new_latest_block).await?
333 }
334 std::cmp::Ordering::Equal => { }
335 std::cmp::Ordering::Greater => self.broadcast_latest_changes(new_latest_block).await?,
336 }
337
338 Ok(())
339 }
340
341 async fn broadcast_reorg(
342 &self,
343 old_latest_block: StarknetBlock,
344 new_latest_block: StarknetBlock,
345 ) -> Result<(), ApiError> {
346 let last_aborted_block_hash =
347 *self.api.starknet.lock().await.last_aborted_block_hash().ok_or(
348 ApiError::StarknetDevnetError(
349 starknet_core::error::Error::UnexpectedInternalError {
350 msg: "Aborted block hash should be defined.".into(),
351 },
352 ),
353 )?;
354
355 let notification = NotificationData::Reorg(ReorgData {
356 starting_block_hash: last_aborted_block_hash,
357 starting_block_number: new_latest_block.block_number().unchecked_next(),
358 ending_block_hash: old_latest_block.block_hash(),
359 ending_block_number: old_latest_block.block_number(),
360 });
361
362 self.api.sockets.lock().await.notify_subscribers(&[notification]).await;
363 Ok(())
364 }
365
366 async fn execute(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, error::ApiError> {
368 trace!(target: "JsonRpcHandler::execute", "executing request");
369 match req {
370 JsonRpcRequest::StarknetSpecRequest(req) => self.execute_starknet_spec(req).await,
371 JsonRpcRequest::DevnetSpecRequest(req) => self.execute_devnet_spec(req).await,
372 }
373 }
374
375 async fn execute_starknet_spec(
376 &self,
377 req: StarknetSpecRequest,
378 ) -> Result<JsonRpcResponse, error::ApiError> {
379 match req {
380 StarknetSpecRequest::SpecVersion => self.spec_version(),
381 StarknetSpecRequest::BlockWithTransactionHashes(block) => {
382 self.get_block_with_tx_hashes(block.block_id).await
383 }
384 StarknetSpecRequest::BlockWithFullTransactions(block) => {
385 self.get_block_with_txs(block.block_id).await
386 }
387 StarknetSpecRequest::BlockWithReceipts(block) => {
388 self.get_block_with_receipts(block.block_id).await
389 }
390 StarknetSpecRequest::StateUpdate(block) => self.get_state_update(block.block_id).await,
391 StarknetSpecRequest::StorageAt(GetStorageInput { contract_address, key, block_id }) => {
392 self.get_storage_at(contract_address, key, block_id).await
393 }
394 StarknetSpecRequest::TransactionStatusByHash(TransactionHashInput {
395 transaction_hash,
396 }) => self.get_transaction_status_by_hash(transaction_hash).await,
397 StarknetSpecRequest::TransactionByHash(TransactionHashInput { transaction_hash }) => {
398 self.get_transaction_by_hash(transaction_hash).await
399 }
400 StarknetSpecRequest::TransactionByBlockAndIndex(BlockAndIndexInput {
401 block_id,
402 index,
403 }) => self.get_transaction_by_block_id_and_index(block_id, index).await,
404 StarknetSpecRequest::TransactionReceiptByTransactionHash(TransactionHashInput {
405 transaction_hash,
406 }) => self.get_transaction_receipt_by_hash(transaction_hash).await,
407 StarknetSpecRequest::ClassByHash(BlockAndClassHashInput { block_id, class_hash }) => {
408 self.get_class(block_id, class_hash).await
409 }
410 StarknetSpecRequest::CompiledCasmByClassHash(ClassHashInput { class_hash }) => {
411 self.get_compiled_casm(class_hash).await
412 }
413 StarknetSpecRequest::ClassHashAtContractAddress(BlockAndContractAddressInput {
414 block_id,
415 contract_address,
416 }) => self.get_class_hash_at(block_id, contract_address).await,
417 StarknetSpecRequest::ClassAtContractAddress(BlockAndContractAddressInput {
418 block_id,
419 contract_address,
420 }) => self.get_class_at(block_id, contract_address).await,
421 StarknetSpecRequest::BlockTransactionCount(block) => {
422 self.get_block_txs_count(block.block_id).await
423 }
424 StarknetSpecRequest::Call(CallInput { request, block_id }) => {
425 self.call(block_id, request).await
426 }
427 StarknetSpecRequest::EstimateFee(EstimateFeeInput {
428 request,
429 block_id,
430 simulation_flags,
431 }) => self.estimate_fee(block_id, request, simulation_flags).await,
432 StarknetSpecRequest::BlockNumber => self.block_number().await,
433 StarknetSpecRequest::BlockHashAndNumber => self.block_hash_and_number().await,
434 StarknetSpecRequest::ChainId => self.chain_id().await,
435 StarknetSpecRequest::Syncing => self.syncing().await,
436 StarknetSpecRequest::Events(EventsInput { filter }) => self.get_events(filter).await,
437 StarknetSpecRequest::ContractNonce(BlockAndContractAddressInput {
438 block_id,
439 contract_address,
440 }) => self.get_nonce(block_id, contract_address).await,
441 StarknetSpecRequest::AddDeclareTransaction(BroadcastedDeclareTransactionInput {
442 declare_transaction,
443 }) => {
444 let BroadcastedDeclareTransactionEnumWrapper::Declare(broadcasted_transaction) =
445 declare_transaction;
446 self.add_declare_transaction(broadcasted_transaction).await
447 }
448 StarknetSpecRequest::AddDeployAccountTransaction(
449 BroadcastedDeployAccountTransactionInput { deploy_account_transaction },
450 ) => {
451 let BroadcastedDeployAccountTransactionEnumWrapper::DeployAccount(
452 broadcasted_transaction,
453 ) = deploy_account_transaction;
454 self.add_deploy_account_transaction(broadcasted_transaction).await
455 }
456 StarknetSpecRequest::AddInvokeTransaction(BroadcastedInvokeTransactionInput {
457 invoke_transaction,
458 }) => {
459 let BroadcastedInvokeTransactionEnumWrapper::Invoke(broadcasted_transaction) =
460 invoke_transaction;
461 self.add_invoke_transaction(broadcasted_transaction).await
462 }
463 StarknetSpecRequest::EstimateMessageFee(request) => {
464 self.estimate_message_fee(request.get_block_id(), request.get_raw_message().clone())
465 .await
466 }
467 StarknetSpecRequest::SimulateTransactions(SimulateTransactionsInput {
468 block_id,
469 transactions,
470 simulation_flags,
471 }) => self.simulate_transactions(block_id, transactions, simulation_flags).await,
472 StarknetSpecRequest::TraceTransaction(TransactionHashInput { transaction_hash }) => {
473 self.get_trace_transaction(transaction_hash).await
474 }
475 StarknetSpecRequest::BlockTransactionTraces(BlockIdInput { block_id }) => {
476 self.get_trace_block_transactions(block_id).await
477 }
478 StarknetSpecRequest::MessagesStatusByL1Hash(data) => {
479 self.get_messages_status(data).await
480 }
481 StarknetSpecRequest::StorageProof(data) => self.get_storage_proof(data).await,
482 }
483 }
484
485 async fn execute_devnet_spec(
486 &self,
487 req: DevnetSpecRequest,
488 ) -> Result<JsonRpcResponse, error::ApiError> {
489 match req {
490 DevnetSpecRequest::ImpersonateAccount(AccountAddressInput { account_address }) => {
491 self.impersonate_account(account_address).await
492 }
493 DevnetSpecRequest::StopImpersonateAccount(AccountAddressInput { account_address }) => {
494 self.stop_impersonating_account(account_address).await
495 }
496 DevnetSpecRequest::AutoImpersonate => self.set_auto_impersonate(true).await,
497 DevnetSpecRequest::StopAutoImpersonate => self.set_auto_impersonate(false).await,
498 DevnetSpecRequest::Dump(path) => self.dump(path).await,
499 DevnetSpecRequest::Load(LoadPath { path }) => self.load(path).await,
500 DevnetSpecRequest::PostmanLoadL1MessagingContract(data) => {
501 self.postman_load(data).await
502 }
503 DevnetSpecRequest::PostmanFlush(data) => self.postman_flush(data).await,
504 DevnetSpecRequest::PostmanSendMessageToL2(message) => {
505 self.postman_send_message_to_l2(message).await
506 }
507 DevnetSpecRequest::PostmanConsumeMessageFromL2(message) => {
508 self.postman_consume_message_from_l2(message).await
509 }
510 DevnetSpecRequest::CreateBlock => self.create_block().await,
511 DevnetSpecRequest::AbortBlocks(data) => self.abort_blocks(data).await,
512 DevnetSpecRequest::AcceptOnL1(data) => self.accept_on_l1(data).await,
513 DevnetSpecRequest::SetGasPrice(data) => self.set_gas_price(data).await,
514 DevnetSpecRequest::Restart(data) => self.restart(data).await,
515 DevnetSpecRequest::SetTime(data) => self.set_time(data).await,
516 DevnetSpecRequest::IncreaseTime(data) => self.increase_time(data).await,
517 DevnetSpecRequest::PredeployedAccounts(data) => {
518 self.get_predeployed_accounts(data).await
519 }
520 DevnetSpecRequest::AccountBalance(data) => self.get_account_balance(data).await,
521 DevnetSpecRequest::Mint(data) => self.mint(data).await,
522 DevnetSpecRequest::DevnetConfig => self.get_devnet_config().await,
523 }
524 }
525
526 async fn on_websocket_call(
528 &self,
529 bytes: &[u8],
530 ws: Arc<Mutex<SplitSink<WebSocket, Message>>>,
531 socket_id: SocketId,
532 ) {
533 let error_serialized = match serde_json::from_slice(bytes) {
534 Ok(rpc_call) => match self.on_websocket_rpc_call(&rpc_call, socket_id).await {
535 Ok(_) => return,
536 Err(e) => serde_json::to_string(&RpcResponse::from_rpc_error(e, rpc_call.id))
537 .unwrap_or_default(),
538 },
539 Err(e) => serde_json::to_string(&RpcResponse::from_rpc_error(
540 RpcError::parse_error(e.to_string()),
541 rpc_core::request::Id::Null,
542 ))
543 .unwrap_or_default(),
544 };
545
546 if let Err(e) = ws.lock().await.send(Message::Text(error_serialized.into())).await {
547 tracing::error!("Error sending websocket message: {e}");
548 }
549 }
550
551 fn allows_method(&self, method: &str) -> bool {
552 if let Some(restricted_methods) = &self.api.server_config.restricted_methods {
553 if is_json_rpc_method_restricted(method, restricted_methods) {
554 return false;
555 }
556 }
557
558 true
559 }
560
561 async fn on_websocket_rpc_call(
566 &self,
567 call: &RpcMethodCall,
568 socket_id: SocketId,
569 ) -> Result<(), RpcError> {
570 trace!(target: "rpc", id = ?call.id, method = ?call.method, "received websocket call");
571
572 let timer = std::time::Instant::now();
573 let method = call.method.clone();
574
575 let req: JsonRpcWsRequest = to_json_rpc_request(call)?;
576 let result = match req {
577 JsonRpcWsRequest::OneTimeRequest(req) => {
578 let resp_result = self.on_request(*req, call.clone()).await;
579 let mut sockets = self.api.sockets.lock().await;
580
581 let socket_context =
582 sockets.get_mut(&socket_id).map_err(|e| e.api_error_to_rpc_error())?;
583
584 match resp_result {
585 ResponseResult::Success(result_value) => {
586 socket_context.send_rpc_response(result_value, call.id.clone()).await;
587 Ok(())
588 }
589 ResponseResult::Error(rpc_error) => Err(rpc_error),
590 }
591 }
592 JsonRpcWsRequest::SubscriptionRequest(req) => self
593 .execute_ws_subscription(req, call.id.clone(), socket_id)
594 .await
595 .map_err(|e| e.api_error_to_rpc_error()),
596 };
597
598 let duration = timer.elapsed().as_secs_f64();
600 let status = if result.is_ok() { "success" } else { "error" };
601 crate::metrics::RPC_CALL_DURATION.with_label_values(&[&method]).observe(duration);
602 crate::metrics::RPC_CALL_COUNT.with_label_values(&[&method, status]).inc();
603
604 result
605 }
606
607 async fn update_dump(&self, event: &RpcMethodCall) -> Result<(), RpcError> {
608 match self.api.config.dump_on {
609 Some(DumpOn::Block) => {
610 let dump_path = self
611 .api
612 .config
613 .dump_path
614 .as_deref()
615 .ok_or(RpcError::internal_error_with("Undefined dump_path"))?;
616
617 dump_event(event, dump_path).map_err(|e| {
618 let msg = format!("Failed dumping of {}: {e}", event.method);
619 RpcError::internal_error_with(msg)
620 })?;
621 }
622 Some(DumpOn::Request | DumpOn::Exit) => {
623 self.api.dumpable_events.lock().await.push(event.clone())
624 }
625 None => (),
626 }
627
628 Ok(())
629 }
630
631 pub async fn re_execute(&self, events: &[RpcMethodCall]) -> Result<(), RpcError> {
632 for event in events {
633 if let ResponseResult::Error(e) = self.on_call(event.clone()).await.result {
634 return Err(e);
635 }
636 }
637 Ok(())
638 }
639}