foundry_fork_db/
backend.rs

1//! Smart caching and deduplication of requests when using a forking provider.
2
3use crate::{
4    cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5    error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9    network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction, AnyTxEnvelope},
10    Provider,
11};
12use alloy_rpc_types::{BlockId, Transaction};
13use alloy_serde::WithOtherFields;
14use eyre::WrapErr;
15use futures::{
16    channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
17    stream::Stream,
18    task::{Context, Poll},
19    Future, FutureExt,
20};
21use revm::{
22    db::DatabaseRef,
23    primitives::{
24        map::{hash_map::Entry, AddressHashMap, HashMap},
25        AccountInfo, Bytecode, KECCAK_EMPTY,
26    },
27};
28use std::{
29    collections::VecDeque,
30    fmt,
31    future::IntoFuture,
32    path::Path,
33    pin::Pin,
34    sync::{
35        mpsc::{channel as oneshot_channel, Sender as OneshotSender},
36        Arc,
37    },
38};
39
40/// Logged when an error is indicative that the user is trying to fork from a non-archive node.
41pub const NON_ARCHIVE_NODE_WARNING: &str = "\
42It looks like you're trying to fork from an older block with a non-archive node which is not \
43supported. Please try to change your RPC url to an archive node if the issue persists.";
44
45// Various future/request type aliases
46
47type AccountFuture<Err> =
48    Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
49type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
50type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
51type FullBlockFuture<Err> = Pin<
52    Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
53>;
54type TransactionFuture<Err> =
55    Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
56
57type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
58type StorageSender = OneshotSender<DatabaseResult<U256>>;
59type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
60type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
61type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
62
63type AddressData = AddressHashMap<AccountInfo>;
64type StorageData = AddressHashMap<StorageInfo>;
65type BlockHashData = HashMap<U256, B256>;
66
67struct AnyRequestFuture<T, Err> {
68    sender: OneshotSender<Result<T, Err>>,
69    future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
70}
71
72impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
75    }
76}
77
78trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
79    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
80}
81
82/// @dev Implements `WrappedAnyRequest` for `AnyRequestFuture`.
83///
84/// - `poll_inner` is similar to `Future` polling but intentionally consumes the Future<Output=T>
85///   and return Future<Output=()>
86/// - This design avoids storing `Future<Output = T>` directly, as its type may not be known at
87///   compile time.
88/// - Instead, the result (`Result<T, Err>`) is sent via the `sender` channel, which enforces type
89///   safety.
90impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
91where
92    T: fmt::Debug + Send + 'static,
93    Err: fmt::Debug + Send + 'static,
94{
95    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
96        match self.future.poll_unpin(cx) {
97            Poll::Ready(result) => {
98                let _ = self.sender.send(result);
99                Poll::Ready(())
100            }
101            Poll::Pending => Poll::Pending,
102        }
103    }
104}
105
106/// Request variants that are executed by the provider
107enum ProviderRequest<Err> {
108    Account(AccountFuture<Err>),
109    Storage(StorageFuture<Err>),
110    BlockHash(BlockHashFuture<Err>),
111    FullBlock(FullBlockFuture<Err>),
112    Transaction(TransactionFuture<Err>),
113    AnyRequest(Box<dyn WrappedAnyRequest>),
114}
115
116/// The Request type the Backend listens for
117#[derive(Debug)]
118enum BackendRequest {
119    /// Fetch the account info
120    Basic(Address, AccountInfoSender),
121    /// Fetch a storage slot
122    Storage(Address, U256, StorageSender),
123    /// Fetch a block hash
124    BlockHash(u64, BlockHashSender),
125    /// Fetch an entire block with transactions
126    FullBlock(BlockId, FullBlockSender),
127    /// Fetch a transaction
128    Transaction(B256, TransactionSender),
129    /// Sets the pinned block to fetch data from
130    SetPinnedBlock(BlockId),
131
132    /// Update Address data
133    UpdateAddress(AddressData),
134    /// Update Storage data
135    UpdateStorage(StorageData),
136    /// Update Block Hashes
137    UpdateBlockHash(BlockHashData),
138    /// Any other request
139    AnyRequest(Box<dyn WrappedAnyRequest>),
140}
141
142/// Handles an internal provider and listens for requests.
143///
144/// This handler will remain active as long as it is reachable (request channel still open) and
145/// requests are in progress.
146#[must_use = "futures do nothing unless polled"]
147pub struct BackendHandler<P> {
148    provider: P,
149    /// Stores all the data.
150    db: BlockchainDb,
151    /// Requests currently in progress
152    pending_requests: Vec<ProviderRequest<eyre::Report>>,
153    /// Listeners that wait for a `get_account` related response
154    account_requests: HashMap<Address, Vec<AccountInfoSender>>,
155    /// Listeners that wait for a `get_storage_at` response
156    storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
157    /// Listeners that wait for a `get_block` response
158    block_requests: HashMap<u64, Vec<BlockHashSender>>,
159    /// Incoming commands.
160    incoming: UnboundedReceiver<BackendRequest>,
161    /// unprocessed queued requests
162    queued_requests: VecDeque<BackendRequest>,
163    /// The block to fetch data from.
164    // This is an `Option` so that we can have less code churn in the functions below
165    block_id: Option<BlockId>,
166}
167
168impl<P> BackendHandler<P>
169where
170    P: Provider<AnyNetwork> + Clone + Unpin + 'static,
171{
172    fn new(
173        provider: P,
174        db: BlockchainDb,
175        rx: UnboundedReceiver<BackendRequest>,
176        block_id: Option<BlockId>,
177    ) -> Self {
178        Self {
179            provider,
180            db,
181            pending_requests: Default::default(),
182            account_requests: Default::default(),
183            storage_requests: Default::default(),
184            block_requests: Default::default(),
185            queued_requests: Default::default(),
186            incoming: rx,
187            block_id,
188        }
189    }
190
191    /// handle the request in queue in the future.
192    ///
193    /// We always check:
194    ///  1. if the requested value is already stored in the cache, then answer the sender
195    ///  2. otherwise, fetch it via the provider but check if a request for that value is already in
196    ///     progress (e.g. another Sender just requested the same account)
197    fn on_request(&mut self, req: BackendRequest) {
198        match req {
199            BackendRequest::Basic(addr, sender) => {
200                trace!(target: "backendhandler", "received request basic address={:?}", addr);
201                let acc = self.db.accounts().read().get(&addr).cloned();
202                if let Some(basic) = acc {
203                    let _ = sender.send(Ok(basic));
204                } else {
205                    self.request_account(addr, sender);
206                }
207            }
208            BackendRequest::BlockHash(number, sender) => {
209                let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
210                if let Some(hash) = hash {
211                    let _ = sender.send(Ok(hash));
212                } else {
213                    self.request_hash(number, sender);
214                }
215            }
216            BackendRequest::FullBlock(number, sender) => {
217                self.request_full_block(number, sender);
218            }
219            BackendRequest::Transaction(tx, sender) => {
220                self.request_transaction(tx, sender);
221            }
222            BackendRequest::Storage(addr, idx, sender) => {
223                // account is already stored in the cache
224                let value =
225                    self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
226                if let Some(value) = value {
227                    let _ = sender.send(Ok(value));
228                } else {
229                    // account present but not storage -> fetch storage
230                    self.request_account_storage(addr, idx, sender);
231                }
232            }
233            BackendRequest::SetPinnedBlock(block_id) => {
234                self.block_id = Some(block_id);
235            }
236            BackendRequest::UpdateAddress(address_data) => {
237                for (address, data) in address_data {
238                    self.db.accounts().write().insert(address, data);
239                }
240            }
241            BackendRequest::UpdateStorage(storage_data) => {
242                for (address, data) in storage_data {
243                    self.db.storage().write().insert(address, data);
244                }
245            }
246            BackendRequest::UpdateBlockHash(block_hash_data) => {
247                for (block, hash) in block_hash_data {
248                    self.db.block_hashes().write().insert(block, hash);
249                }
250            }
251            BackendRequest::AnyRequest(fut) => {
252                self.pending_requests.push(ProviderRequest::AnyRequest(fut));
253            }
254        }
255    }
256
257    /// process a request for account's storage
258    fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
259        match self.storage_requests.entry((address, idx)) {
260            Entry::Occupied(mut entry) => {
261                entry.get_mut().push(listener);
262            }
263            Entry::Vacant(entry) => {
264                trace!(target: "backendhandler", %address, %idx, "preparing storage request");
265                entry.insert(vec![listener]);
266                let provider = self.provider.clone();
267                let block_id = self.block_id.unwrap_or_default();
268                let fut = Box::pin(async move {
269                    let storage = provider
270                        .get_storage_at(address, idx)
271                        .block_id(block_id)
272                        .await
273                        .map_err(Into::into);
274                    (storage, address, idx)
275                });
276                self.pending_requests.push(ProviderRequest::Storage(fut));
277            }
278        }
279    }
280
281    /// returns the future that fetches the account data
282    fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
283        trace!(target: "backendhandler", "preparing account request, address={:?}", address);
284        let provider = self.provider.clone();
285        let block_id = self.block_id.unwrap_or_default();
286        let fut = Box::pin(async move {
287            let balance = provider.get_balance(address).block_id(block_id).into_future();
288            let nonce = provider.get_transaction_count(address).block_id(block_id).into_future();
289            let code = provider.get_code_at(address).block_id(block_id).into_future();
290            let resp = tokio::try_join!(balance, nonce, code).map_err(Into::into);
291            (resp, address)
292        });
293        ProviderRequest::Account(fut)
294    }
295
296    /// process a request for an account
297    fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
298        match self.account_requests.entry(address) {
299            Entry::Occupied(mut entry) => {
300                entry.get_mut().push(listener);
301            }
302            Entry::Vacant(entry) => {
303                entry.insert(vec![listener]);
304                self.pending_requests.push(self.get_account_req(address));
305            }
306        }
307    }
308
309    /// process a request for an entire block
310    fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
311        let provider = self.provider.clone();
312        let fut = Box::pin(async move {
313            let block = provider
314                .get_block(number, true.into())
315                .await
316                .wrap_err(format!("could not fetch block {number:?}"));
317            (sender, block, number)
318        });
319
320        self.pending_requests.push(ProviderRequest::FullBlock(fut));
321    }
322
323    /// process a request for a transactions
324    fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
325        let provider = self.provider.clone();
326        let fut = Box::pin(async move {
327            let block = provider
328                .get_transaction_by_hash(tx)
329                .await
330                .wrap_err_with(|| format!("could not get transaction {tx}"))
331                .and_then(|maybe| {
332                    maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
333                });
334            (sender, block, tx)
335        });
336
337        self.pending_requests.push(ProviderRequest::Transaction(fut));
338    }
339
340    /// process a request for a block hash
341    fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
342        match self.block_requests.entry(number) {
343            Entry::Occupied(mut entry) => {
344                entry.get_mut().push(listener);
345            }
346            Entry::Vacant(entry) => {
347                trace!(target: "backendhandler", number, "preparing block hash request");
348                entry.insert(vec![listener]);
349                let provider = self.provider.clone();
350                let fut = Box::pin(async move {
351                    let block = provider
352                        .get_block_by_number(
353                            number.into(),
354                            alloy_rpc_types::BlockTransactionsKind::Hashes,
355                        )
356                        .await
357                        .wrap_err("failed to get block");
358
359                    let block_hash = match block {
360                        Ok(Some(block)) => Ok(block.header.hash),
361                        Ok(None) => {
362                            warn!(target: "backendhandler", ?number, "block not found");
363                            // if no block was returned then the block does not exist, in which case
364                            // we return empty hash
365                            Ok(KECCAK_EMPTY)
366                        }
367                        Err(err) => {
368                            error!(target: "backendhandler", %err, ?number, "failed to get block");
369                            Err(err)
370                        }
371                    };
372                    (block_hash, number)
373                });
374                self.pending_requests.push(ProviderRequest::BlockHash(fut));
375            }
376        }
377    }
378}
379
380impl<P> Future for BackendHandler<P>
381where
382    P: Provider<AnyNetwork> + Clone + Unpin + 'static,
383{
384    type Output = ();
385
386    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
387        let pin = self.get_mut();
388        loop {
389            // Drain queued requests first.
390            while let Some(req) = pin.queued_requests.pop_front() {
391                pin.on_request(req)
392            }
393
394            // receive new requests to delegate to the underlying provider
395            loop {
396                match Pin::new(&mut pin.incoming).poll_next(cx) {
397                    Poll::Ready(Some(req)) => {
398                        pin.queued_requests.push_back(req);
399                    }
400                    Poll::Ready(None) => {
401                        trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
402                        return Poll::Ready(());
403                    }
404                    Poll::Pending => break,
405                }
406            }
407
408            // poll all requests in progress
409            for n in (0..pin.pending_requests.len()).rev() {
410                let mut request = pin.pending_requests.swap_remove(n);
411                match &mut request {
412                    ProviderRequest::Account(fut) => {
413                        if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
414                            // get the response
415                            let (balance, nonce, code) = match resp {
416                                Ok(res) => res,
417                                Err(err) => {
418                                    let err = Arc::new(err);
419                                    if let Some(listeners) = pin.account_requests.remove(&addr) {
420                                        listeners.into_iter().for_each(|l| {
421                                            let _ = l.send(Err(DatabaseError::GetAccount(
422                                                addr,
423                                                Arc::clone(&err),
424                                            )));
425                                        })
426                                    }
427                                    continue;
428                                }
429                            };
430
431                            // convert it to revm-style types
432                            let (code, code_hash) = if !code.is_empty() {
433                                (code.clone(), keccak256(&code))
434                            } else {
435                                (Bytes::default(), KECCAK_EMPTY)
436                            };
437
438                            // update the cache
439                            let acc = AccountInfo {
440                                nonce,
441                                balance,
442                                code: Some(Bytecode::new_raw(code)),
443                                code_hash,
444                            };
445                            pin.db.accounts().write().insert(addr, acc.clone());
446
447                            // notify all listeners
448                            if let Some(listeners) = pin.account_requests.remove(&addr) {
449                                listeners.into_iter().for_each(|l| {
450                                    let _ = l.send(Ok(acc.clone()));
451                                })
452                            }
453                            continue;
454                        }
455                    }
456                    ProviderRequest::Storage(fut) => {
457                        if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
458                            let value = match resp {
459                                Ok(value) => value,
460                                Err(err) => {
461                                    // notify all listeners
462                                    let err = Arc::new(err);
463                                    if let Some(listeners) =
464                                        pin.storage_requests.remove(&(addr, idx))
465                                    {
466                                        listeners.into_iter().for_each(|l| {
467                                            let _ = l.send(Err(DatabaseError::GetStorage(
468                                                addr,
469                                                idx,
470                                                Arc::clone(&err),
471                                            )));
472                                        })
473                                    }
474                                    continue;
475                                }
476                            };
477
478                            // update the cache
479                            pin.db.storage().write().entry(addr).or_default().insert(idx, value);
480
481                            // notify all listeners
482                            if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
483                                listeners.into_iter().for_each(|l| {
484                                    let _ = l.send(Ok(value));
485                                })
486                            }
487                            continue;
488                        }
489                    }
490                    ProviderRequest::BlockHash(fut) => {
491                        if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
492                            let value = match block_hash {
493                                Ok(value) => value,
494                                Err(err) => {
495                                    let err = Arc::new(err);
496                                    // notify all listeners
497                                    if let Some(listeners) = pin.block_requests.remove(&number) {
498                                        listeners.into_iter().for_each(|l| {
499                                            let _ = l.send(Err(DatabaseError::GetBlockHash(
500                                                number,
501                                                Arc::clone(&err),
502                                            )));
503                                        })
504                                    }
505                                    continue;
506                                }
507                            };
508
509                            // update the cache
510                            pin.db.block_hashes().write().insert(U256::from(number), value);
511
512                            // notify all listeners
513                            if let Some(listeners) = pin.block_requests.remove(&number) {
514                                listeners.into_iter().for_each(|l| {
515                                    let _ = l.send(Ok(value));
516                                })
517                            }
518                            continue;
519                        }
520                    }
521                    ProviderRequest::FullBlock(fut) => {
522                        if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
523                            let msg = match resp {
524                                Ok(Some(block)) => Ok(block),
525                                Ok(None) => Err(DatabaseError::BlockNotFound(number)),
526                                Err(err) => {
527                                    let err = Arc::new(err);
528                                    Err(DatabaseError::GetFullBlock(number, err))
529                                }
530                            };
531                            let _ = sender.send(msg);
532                            continue;
533                        }
534                    }
535                    ProviderRequest::Transaction(fut) => {
536                        if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
537                            let msg = match tx {
538                                Ok(tx) => Ok(tx),
539                                Err(err) => {
540                                    let err = Arc::new(err);
541                                    Err(DatabaseError::GetTransaction(tx_hash, err))
542                                }
543                            };
544                            let _ = sender.send(msg);
545                            continue;
546                        }
547                    }
548                    ProviderRequest::AnyRequest(fut) => {
549                        if fut.poll_inner(cx).is_ready() {
550                            continue;
551                        }
552                    }
553                }
554                // not ready, insert and poll again
555                pin.pending_requests.push(request);
556            }
557
558            // If no new requests have been queued, break to
559            // be polled again later.
560            if pin.queued_requests.is_empty() {
561                return Poll::Pending;
562            }
563        }
564    }
565}
566
567/// Mode for the `SharedBackend` how to block in the non-async [`DatabaseRef`] when interacting with
568/// [`BackendHandler`].
569#[derive(Default, Clone, Debug, PartialEq)]
570pub enum BlockingMode {
571    /// This mode use `tokio::task::block_in_place()` to block in place.
572    ///
573    /// This should be used when blocking on the call site is disallowed.
574    #[default]
575    BlockInPlace,
576    /// The mode blocks the current task
577    ///
578    /// This can be used if blocking on the call site is allowed, e.g. on a tokio blocking task.
579    Block,
580}
581
582impl BlockingMode {
583    /// run process logic with the blocking mode
584    pub fn run<F, R>(&self, f: F) -> R
585    where
586        F: FnOnce() -> R,
587    {
588        match self {
589            Self::BlockInPlace => tokio::task::block_in_place(f),
590            Self::Block => f(),
591        }
592    }
593}
594
595/// A cloneable backend type that shares access to the backend data with all its clones.
596///
597/// This backend type is connected to the `BackendHandler` via a mpsc unbounded channel. The
598/// `BackendHandler` is spawned on a tokio task and listens for incoming commands on the receiver
599/// half of the channel. A `SharedBackend` holds a sender for that channel, which is `Clone`, so
600/// there can be multiple `SharedBackend`s communicating with the same `BackendHandler`, hence this
601/// `Backend` type is thread safe.
602///
603/// All `Backend` trait functions are delegated as a `BackendRequest` via the channel to the
604/// `BackendHandler`. All `BackendRequest` variants include a sender half of an additional channel
605/// that is used by the `BackendHandler` to send the result of an executed `BackendRequest` back to
606/// `SharedBackend`.
607///
608/// The `BackendHandler` holds a `Provider` to look up missing accounts or storage slots
609/// from remote (e.g. infura). It detects duplicate requests from multiple `SharedBackend`s and
610/// bundles them together, so that always only one provider request is executed. For example, there
611/// are two `SharedBackend`s, `A` and `B`, both request the basic account info of account
612/// `0xasd9sa7d...` at the same time. After the `BackendHandler` receives the request from `A`, it
613/// sends a new provider request to the provider's endpoint, then it reads the identical request
614/// from `B` and simply adds it as an additional listener for the request already in progress,
615/// instead of sending another one. So that after the provider returns the response all listeners
616/// (`A` and `B`) get notified.
617// **Note**: the implementation makes use of [tokio::task::block_in_place()] when interacting with
618// the underlying [BackendHandler] which runs on a separate spawned tokio task.
619// [tokio::task::block_in_place()]
620// > Runs the provided blocking function on the current thread without blocking the executor.
621// This prevents issues (hangs) we ran into were the [SharedBackend] itself is called from a spawned
622// task.
623#[derive(Clone, Debug)]
624pub struct SharedBackend {
625    /// channel used for sending commands related to database operations
626    backend: UnboundedSender<BackendRequest>,
627    /// Ensures that the underlying cache gets flushed once the last `SharedBackend` is dropped.
628    ///
629    /// There is only one instance of the type, so as soon as the last `SharedBackend` is deleted,
630    /// `FlushJsonBlockCacheDB` is also deleted and the cache is flushed.
631    cache: Arc<FlushJsonBlockCacheDB>,
632
633    /// The mode for the `SharedBackend` to block in place or not
634    blocking_mode: BlockingMode,
635}
636
637impl SharedBackend {
638    /// _Spawns_ a new `BackendHandler` on a `tokio::task` that listens for requests from any
639    /// `SharedBackend`. Missing values get inserted in the `db`.
640    ///
641    /// The spawned `BackendHandler` finishes once the last `SharedBackend` connected to it is
642    /// dropped.
643    ///
644    /// NOTE: this should be called with `Arc<Provider>`
645    pub async fn spawn_backend<P>(provider: P, db: BlockchainDb, pin_block: Option<BlockId>) -> Self
646    where
647        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
648    {
649        let (shared, handler) = Self::new(provider, db, pin_block);
650        // spawn the provider handler to a task
651        trace!(target: "backendhandler", "spawning Backendhandler task");
652        tokio::spawn(handler);
653        shared
654    }
655
656    /// Same as `Self::spawn_backend` but spawns the `BackendHandler` on a separate `std::thread` in
657    /// its own `tokio::Runtime`
658    pub fn spawn_backend_thread<P>(
659        provider: P,
660        db: BlockchainDb,
661        pin_block: Option<BlockId>,
662    ) -> Self
663    where
664        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
665    {
666        let (shared, handler) = Self::new(provider, db, pin_block);
667
668        // spawn a light-weight thread with a thread-local async runtime just for
669        // sending and receiving data from the remote client
670        std::thread::Builder::new()
671            .name("fork-backend".into())
672            .spawn(move || {
673                let rt = tokio::runtime::Builder::new_current_thread()
674                    .enable_all()
675                    .build()
676                    .expect("failed to build tokio runtime");
677
678                rt.block_on(handler);
679            })
680            .expect("failed to spawn thread");
681        trace!(target: "backendhandler", "spawned Backendhandler thread");
682
683        shared
684    }
685
686    /// Returns a new `SharedBackend` and the `BackendHandler`
687    pub fn new<P>(
688        provider: P,
689        db: BlockchainDb,
690        pin_block: Option<BlockId>,
691    ) -> (Self, BackendHandler<P>)
692    where
693        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
694    {
695        let (backend, backend_rx) = unbounded();
696        let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
697        let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
698        (Self { backend, cache, blocking_mode: Default::default() }, handler)
699    }
700
701    /// Returns a new `SharedBackend` and the `BackendHandler` with a specific blocking mode
702    pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
703        Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
704    }
705
706    /// Updates the pinned block to fetch data from
707    pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
708        let req = BackendRequest::SetPinnedBlock(block.into());
709        self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
710    }
711
712    /// Returns the full block for the given block identifier
713    pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
714        self.blocking_mode.run(|| {
715            let (sender, rx) = oneshot_channel();
716            let req = BackendRequest::FullBlock(block.into(), sender);
717            self.backend.unbounded_send(req)?;
718            rx.recv()?
719        })
720    }
721
722    /// Returns the transaction for the hash
723    pub fn get_transaction(
724        &self,
725        tx: B256,
726    ) -> DatabaseResult<WithOtherFields<Transaction<AnyTxEnvelope>>> {
727        self.blocking_mode.run(|| {
728            let (sender, rx) = oneshot_channel();
729            let req = BackendRequest::Transaction(tx, sender);
730            self.backend.unbounded_send(req)?;
731            rx.recv()?
732        })
733    }
734
735    fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
736        self.blocking_mode.run(|| {
737            let (sender, rx) = oneshot_channel();
738            let req = BackendRequest::Basic(address, sender);
739            self.backend.unbounded_send(req)?;
740            rx.recv()?.map(Some)
741        })
742    }
743
744    fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
745        self.blocking_mode.run(|| {
746            let (sender, rx) = oneshot_channel();
747            let req = BackendRequest::Storage(address, index, sender);
748            self.backend.unbounded_send(req)?;
749            rx.recv()?
750        })
751    }
752
753    fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
754        self.blocking_mode.run(|| {
755            let (sender, rx) = oneshot_channel();
756            let req = BackendRequest::BlockHash(number, sender);
757            self.backend.unbounded_send(req)?;
758            rx.recv()?
759        })
760    }
761
762    /// Inserts or updates data for multiple addresses
763    pub fn insert_or_update_address(&self, address_data: AddressData) {
764        let req = BackendRequest::UpdateAddress(address_data);
765        let err = self.backend.unbounded_send(req);
766        match err {
767            Ok(_) => (),
768            Err(e) => {
769                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
770            }
771        }
772    }
773
774    /// Inserts or updates data for multiple storage slots
775    pub fn insert_or_update_storage(&self, storage_data: StorageData) {
776        let req = BackendRequest::UpdateStorage(storage_data);
777        let err = self.backend.unbounded_send(req);
778        match err {
779            Ok(_) => (),
780            Err(e) => {
781                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
782            }
783        }
784    }
785
786    /// Inserts or updates data for multiple block hashes
787    pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
788        let req = BackendRequest::UpdateBlockHash(block_hash_data);
789        let err = self.backend.unbounded_send(req);
790        match err {
791            Ok(_) => (),
792            Err(e) => {
793                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
794            }
795        }
796    }
797
798    /// Returns any arbitrary request on the provider
799    pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
800    where
801        F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
802        T: fmt::Debug + Send + 'static,
803    {
804        self.blocking_mode.run(|| {
805            let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
806            let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
807                sender,
808                future: Box::pin(fut),
809            }));
810            self.backend.unbounded_send(req)?;
811            rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
812        })
813    }
814
815    /// Flushes the DB to disk if caching is enabled
816    pub fn flush_cache(&self) {
817        self.cache.0.flush();
818    }
819
820    /// Flushes the DB to a specific file
821    pub fn flush_cache_to(&self, cache_path: &Path) {
822        self.cache.0.flush_to(cache_path);
823    }
824
825    /// Returns the DB
826    pub fn data(&self) -> Arc<MemDb> {
827        self.cache.0.db().clone()
828    }
829
830    /// Returns the DB accounts
831    pub fn accounts(&self) -> AddressData {
832        self.cache.0.db().accounts.read().clone()
833    }
834
835    /// Returns the DB accounts length
836    pub fn accounts_len(&self) -> usize {
837        self.cache.0.db().accounts.read().len()
838    }
839
840    /// Returns the DB storage
841    pub fn storage(&self) -> StorageData {
842        self.cache.0.db().storage.read().clone()
843    }
844
845    /// Returns the DB storage length
846    pub fn storage_len(&self) -> usize {
847        self.cache.0.db().storage.read().len()
848    }
849
850    /// Returns the DB block_hashes
851    pub fn block_hashes(&self) -> BlockHashData {
852        self.cache.0.db().block_hashes.read().clone()
853    }
854
855    /// Returns the DB block_hashes length
856    pub fn block_hashes_len(&self) -> usize {
857        self.cache.0.db().block_hashes.read().len()
858    }
859}
860
861impl DatabaseRef for SharedBackend {
862    type Error = DatabaseError;
863
864    fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
865        trace!(target: "sharedbackend", %address, "request basic");
866        self.do_get_basic(address).map_err(|err| {
867            error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
868            if err.is_possibly_non_archive_node_error() {
869                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
870            }
871            err
872        })
873    }
874
875    fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
876        Err(DatabaseError::MissingCode(hash))
877    }
878
879    fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
880        trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
881        self.do_get_storage(address, index).map_err(|err| {
882            error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
883            if err.is_possibly_non_archive_node_error() {
884                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
885            }
886          err
887        })
888    }
889
890    fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
891        trace!(target: "sharedbackend", "request block hash for number {:?}", number);
892        self.do_get_block_hash(number).map_err(|err| {
893            error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
894            if err.is_possibly_non_archive_node_error() {
895                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
896            }
897            err
898        })
899    }
900}
901
902#[cfg(test)]
903mod tests {
904    use super::*;
905    use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
906    use alloy_provider::ProviderBuilder;
907    use alloy_rpc_client::ClientBuilder;
908    use serde::Deserialize;
909    use std::{collections::BTreeSet, fs, path::PathBuf};
910    use tiny_http::{Response, Server};
911
912    pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
913        ProviderBuilder::new()
914            .network::<AnyNetwork>()
915            .on_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
916    }
917
918    const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
919
920    #[tokio::test(flavor = "multi_thread")]
921    async fn test_builder() {
922        let Some(endpoint) = ENDPOINT else { return };
923        let provider = get_http_provider(endpoint);
924
925        let any_rpc_block = provider
926            .get_block(BlockId::latest(), alloy_rpc_types::BlockTransactionsKind::Hashes)
927            .await
928            .unwrap()
929            .unwrap();
930        let _meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
931    }
932
933    #[tokio::test(flavor = "multi_thread")]
934    async fn shared_backend() {
935        let Some(endpoint) = ENDPOINT else { return };
936
937        let provider = get_http_provider(endpoint);
938        let meta = BlockchainDbMeta {
939            cfg_env: Default::default(),
940            block_env: Default::default(),
941            hosts: BTreeSet::from([endpoint.to_string()]),
942        };
943
944        let db = BlockchainDb::new(meta, None);
945        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
946
947        // some rng contract from etherscan
948        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
949
950        let idx = U256::from(0u64);
951        let value = backend.storage_ref(address, idx).unwrap();
952        let account = backend.basic_ref(address).unwrap().unwrap();
953
954        let mem_acc = db.accounts().read().get(&address).unwrap().clone();
955        assert_eq!(account.balance, mem_acc.balance);
956        assert_eq!(account.nonce, mem_acc.nonce);
957        let slots = db.storage().read().get(&address).unwrap().clone();
958        assert_eq!(slots.len(), 1);
959        assert_eq!(slots.get(&idx).copied().unwrap(), value);
960
961        let num = 10u64;
962        let hash = backend.block_hash_ref(num).unwrap();
963        let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
964        assert_eq!(hash, mem_hash);
965
966        let max_slots = 5;
967        let handle = std::thread::spawn(move || {
968            for i in 1..max_slots {
969                let idx = U256::from(i);
970                let _ = backend.storage_ref(address, idx);
971            }
972        });
973        handle.join().unwrap();
974        let slots = db.storage().read().get(&address).unwrap().clone();
975        assert_eq!(slots.len() as u64, max_slots);
976    }
977
978    #[test]
979    fn can_read_cache() {
980        let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
981        let json = JsonBlockCacheDB::load(cache_path).unwrap();
982        assert!(!json.db().accounts.read().is_empty());
983    }
984
985    #[tokio::test(flavor = "multi_thread")]
986    async fn can_modify_address() {
987        let Some(endpoint) = ENDPOINT else { return };
988
989        let provider = get_http_provider(endpoint);
990        let meta = BlockchainDbMeta {
991            cfg_env: Default::default(),
992            block_env: Default::default(),
993            hosts: BTreeSet::from([endpoint.to_string()]),
994        };
995
996        let db = BlockchainDb::new(meta, None);
997        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
998
999        // some rng contract from etherscan
1000        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1001
1002        let new_acc = AccountInfo {
1003            nonce: 1000u64,
1004            balance: U256::from(2000),
1005            code: None,
1006            code_hash: KECCAK_EMPTY,
1007        };
1008        let mut account_data = AddressData::default();
1009        account_data.insert(address, new_acc.clone());
1010
1011        backend.insert_or_update_address(account_data);
1012
1013        let max_slots = 5;
1014        let handle = std::thread::spawn(move || {
1015            for i in 1..max_slots {
1016                let idx = U256::from(i);
1017                let result_address = backend.basic_ref(address).unwrap();
1018                match result_address {
1019                    Some(acc) => {
1020                        assert_eq!(
1021                            acc.nonce, new_acc.nonce,
1022                            "The nonce was not changed in instance of index {}",
1023                            idx
1024                        );
1025                        assert_eq!(
1026                            acc.balance, new_acc.balance,
1027                            "The balance was not changed in instance of index {}",
1028                            idx
1029                        );
1030
1031                        // comparing with db
1032                        let db_address = {
1033                            let accounts = db.accounts().read();
1034                            accounts.get(&address).unwrap().clone()
1035                        };
1036
1037                        assert_eq!(
1038                            db_address.nonce, new_acc.nonce,
1039                            "The nonce was not changed in instance of index {}",
1040                            idx
1041                        );
1042                        assert_eq!(
1043                            db_address.balance, new_acc.balance,
1044                            "The balance was not changed in instance of index {}",
1045                            idx
1046                        );
1047                    }
1048                    None => panic!("Account not found"),
1049                }
1050            }
1051        });
1052        handle.join().unwrap();
1053    }
1054
1055    #[tokio::test(flavor = "multi_thread")]
1056    async fn can_modify_storage() {
1057        let Some(endpoint) = ENDPOINT else { return };
1058
1059        let provider = get_http_provider(endpoint);
1060        let meta = BlockchainDbMeta {
1061            cfg_env: Default::default(),
1062            block_env: Default::default(),
1063            hosts: BTreeSet::from([endpoint.to_string()]),
1064        };
1065
1066        let db = BlockchainDb::new(meta, None);
1067        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1068
1069        // some rng contract from etherscan
1070        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1071
1072        let mut storage_data = StorageData::default();
1073        let mut storage_info = StorageInfo::default();
1074        storage_info.insert(U256::from(20), U256::from(10));
1075        storage_info.insert(U256::from(30), U256::from(15));
1076        storage_info.insert(U256::from(40), U256::from(20));
1077
1078        storage_data.insert(address, storage_info);
1079
1080        backend.insert_or_update_storage(storage_data.clone());
1081
1082        let max_slots = 5;
1083        let handle = std::thread::spawn(move || {
1084            for _ in 1..max_slots {
1085                for (address, info) in &storage_data {
1086                    for (index, value) in info {
1087                        let result_storage = backend.do_get_storage(*address, *index);
1088                        match result_storage {
1089                            Ok(stg_db) => {
1090                                assert_eq!(
1091                                    stg_db, *value,
1092                                    "Storage in slot number {} in address {} do not have the same value", index, address
1093                                );
1094
1095                                let db_result = {
1096                                    let storage = db.storage().read();
1097                                    let address_storage = storage.get(address).unwrap();
1098                                    *address_storage.get(index).unwrap()
1099                                };
1100
1101                                assert_eq!(
1102                                    stg_db, db_result,
1103                                    "Storage in slot number {} in address {} do not have the same value", index, address
1104                                )
1105                            }
1106
1107                            Err(err) => {
1108                                panic!("There was a database error: {}", err)
1109                            }
1110                        }
1111                    }
1112                }
1113            }
1114        });
1115        handle.join().unwrap();
1116    }
1117
1118    #[tokio::test(flavor = "multi_thread")]
1119    async fn can_modify_block_hashes() {
1120        let Some(endpoint) = ENDPOINT else { return };
1121
1122        let provider = get_http_provider(endpoint);
1123        let meta = BlockchainDbMeta {
1124            cfg_env: Default::default(),
1125            block_env: Default::default(),
1126            hosts: BTreeSet::from([endpoint.to_string()]),
1127        };
1128
1129        let db = BlockchainDb::new(meta, None);
1130        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1131
1132        // some rng contract from etherscan
1133        // let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1134
1135        let mut block_hash_data = BlockHashData::default();
1136        block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1137        block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1138        block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1139        block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1140        block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1141
1142        backend.insert_or_update_block_hashes(block_hash_data.clone());
1143
1144        let max_slots: u64 = 5;
1145        let handle = std::thread::spawn(move || {
1146            for i in 1..max_slots {
1147                let key = U256::from(i);
1148                let result_hash = backend.do_get_block_hash(i);
1149                match result_hash {
1150                    Ok(hash) => {
1151                        assert_eq!(
1152                            hash,
1153                            *block_hash_data.get(&key).unwrap(),
1154                            "The hash in block {} did not match",
1155                            key
1156                        );
1157
1158                        let db_result = {
1159                            let hashes = db.block_hashes().read();
1160                            *hashes.get(&key).unwrap()
1161                        };
1162
1163                        assert_eq!(hash, db_result, "The hash in block {} did not match", key);
1164                    }
1165                    Err(err) => panic!("Hash not found, error: {}", err),
1166                }
1167            }
1168        });
1169        handle.join().unwrap();
1170    }
1171
1172    #[tokio::test(flavor = "multi_thread")]
1173    async fn can_modify_storage_with_cache() {
1174        let Some(endpoint) = ENDPOINT else { return };
1175
1176        let provider = get_http_provider(endpoint);
1177        let meta = BlockchainDbMeta {
1178            cfg_env: Default::default(),
1179            block_env: Default::default(),
1180            hosts: BTreeSet::from([endpoint.to_string()]),
1181        };
1182
1183        // create a temporary file
1184        fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1185
1186        let cache_path =
1187            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1188
1189        let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1190        let backend =
1191            SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1192
1193        // some rng contract from etherscan
1194        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1195
1196        let mut storage_data = StorageData::default();
1197        let mut storage_info = StorageInfo::default();
1198        storage_info.insert(U256::from(1), U256::from(10));
1199        storage_info.insert(U256::from(2), U256::from(15));
1200        storage_info.insert(U256::from(3), U256::from(20));
1201        storage_info.insert(U256::from(4), U256::from(20));
1202        storage_info.insert(U256::from(5), U256::from(15));
1203        storage_info.insert(U256::from(6), U256::from(10));
1204
1205        let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1206        address_data.code = None;
1207
1208        storage_data.insert(address, storage_info);
1209
1210        backend.insert_or_update_storage(storage_data.clone());
1211
1212        let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1213        // nullify the code
1214        new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1215
1216        let mut account_data = AddressData::default();
1217        account_data.insert(address, new_acc.clone());
1218
1219        backend.insert_or_update_address(account_data);
1220
1221        let backend_clone = backend.clone();
1222
1223        let max_slots = 5;
1224        let handle = std::thread::spawn(move || {
1225            for _ in 1..max_slots {
1226                for (address, info) in &storage_data {
1227                    for (index, value) in info {
1228                        let result_storage = backend.do_get_storage(*address, *index);
1229                        match result_storage {
1230                            Ok(stg_db) => {
1231                                assert_eq!(
1232                                    stg_db, *value,
1233                                    "Storage in slot number {} in address {} doesn't have the same value", index, address
1234                                );
1235
1236                                let db_result = {
1237                                    let storage = db.storage().read();
1238                                    let address_storage = storage.get(address).unwrap();
1239                                    *address_storage.get(index).unwrap()
1240                                };
1241
1242                                assert_eq!(
1243                                    stg_db, db_result,
1244                                    "Storage in slot number {} in address {} doesn't have the same value", index, address
1245                                );
1246                            }
1247
1248                            Err(err) => {
1249                                panic!("There was a database error: {}", err)
1250                            }
1251                        }
1252                    }
1253                }
1254            }
1255
1256            backend_clone.flush_cache();
1257        });
1258        handle.join().unwrap();
1259
1260        // read json and confirm the changes to the data
1261
1262        let cache_path =
1263            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1264
1265        let json_db = BlockchainDb::new(meta, Some(cache_path));
1266
1267        let mut storage_data = StorageData::default();
1268        let mut storage_info = StorageInfo::default();
1269        storage_info.insert(U256::from(1), U256::from(10));
1270        storage_info.insert(U256::from(2), U256::from(15));
1271        storage_info.insert(U256::from(3), U256::from(20));
1272        storage_info.insert(U256::from(4), U256::from(20));
1273        storage_info.insert(U256::from(5), U256::from(15));
1274        storage_info.insert(U256::from(6), U256::from(10));
1275
1276        storage_data.insert(address, storage_info);
1277
1278        // redo the checks with the data extracted from the json file
1279        let max_slots = 5;
1280        let handle = std::thread::spawn(move || {
1281            for _ in 1..max_slots {
1282                for (address, info) in &storage_data {
1283                    for (index, value) in info {
1284                        let result_storage = {
1285                            let storage = json_db.storage().read();
1286                            let address_storage = storage.get(address).unwrap().clone();
1287                            *address_storage.get(index).unwrap()
1288                        };
1289
1290                        assert_eq!(
1291                            result_storage, *value,
1292                            "Storage in slot number {} in address {} doesn't have the same value",
1293                            index, address
1294                        );
1295                    }
1296                }
1297            }
1298        });
1299
1300        handle.join().unwrap();
1301
1302        // erase the temporary file
1303        fs::remove_file("test-data/storage-tmp.json").unwrap();
1304    }
1305
1306    #[tokio::test(flavor = "multi_thread")]
1307    async fn shared_backend_any_request() {
1308        let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1309        let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1310        let endpoint = format!("http://{}", server.server_addr());
1311
1312        // Spin an in-memory server that responds to "foo_callCustomMethod" rpc call.
1313        let expected_bytes_innner = expected_response_bytes.clone();
1314        let server_handle = std::thread::spawn(move || {
1315            #[derive(Debug, Deserialize)]
1316            struct Request {
1317                method: String,
1318            }
1319            let mut request = server.recv().unwrap();
1320            let rpc_request: Request =
1321                serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1322
1323            match rpc_request.method.as_str() {
1324                "foo_callCustomMethod" => request
1325                    .respond(Response::from_string(format!(
1326                        r#"{{"result": "{}"}}"#,
1327                        alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1328                    )))
1329                    .unwrap(),
1330                _ => request
1331                    .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1332                    .unwrap(),
1333            };
1334        });
1335
1336        let provider = get_http_provider(&endpoint);
1337        let meta = BlockchainDbMeta {
1338            cfg_env: Default::default(),
1339            block_env: Default::default(),
1340            hosts: BTreeSet::from([endpoint.to_string()]),
1341        };
1342
1343        let db = BlockchainDb::new(meta, None);
1344        let provider_inner = provider.clone();
1345        let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1346
1347        let actual_response_bytes = backend
1348            .do_any_request(async move {
1349                let bytes: alloy_primitives::Bytes =
1350                    provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1351                Ok(bytes)
1352            })
1353            .expect("failed performing any request");
1354
1355        assert_eq!(actual_response_bytes, expected_response_bytes);
1356
1357        server_handle.join().unwrap();
1358    }
1359}