foundry_fork_db/
backend.rs

1//! Smart caching and deduplication of requests when using a forking provider.
2
3use crate::{
4    cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5    error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9    network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction},
10    Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15    channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16    stream::Stream,
17    task::{Context, Poll},
18    Future, FutureExt,
19};
20use revm::{
21    db::DatabaseRef,
22    primitives::{
23        map::{hash_map::Entry, AddressHashMap, HashMap},
24        AccountInfo, Bytecode, KECCAK_EMPTY,
25    },
26};
27use std::{
28    collections::VecDeque,
29    fmt,
30    future::IntoFuture,
31    path::Path,
32    pin::Pin,
33    sync::{
34        mpsc::{channel as oneshot_channel, Sender as OneshotSender},
35        Arc,
36    },
37};
38
39/// Logged when an error is indicative that the user is trying to fork from a non-archive node.
40pub const NON_ARCHIVE_NODE_WARNING: &str = "\
41It looks like you're trying to fork from an older block with a non-archive node which is not \
42supported. Please try to change your RPC url to an archive node if the issue persists.";
43
44// Various future/request type aliases
45
46type AccountFuture<Err> =
47    Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
48type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
49type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
50type FullBlockFuture<Err> = Pin<
51    Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
52>;
53type TransactionFuture<Err> =
54    Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
55
56type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
57type StorageSender = OneshotSender<DatabaseResult<U256>>;
58type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
59type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
60type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
61
62type AddressData = AddressHashMap<AccountInfo>;
63type StorageData = AddressHashMap<StorageInfo>;
64type BlockHashData = HashMap<U256, B256>;
65
66struct AnyRequestFuture<T, Err> {
67    sender: OneshotSender<Result<T, Err>>,
68    future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
69}
70
71impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
74    }
75}
76
77trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
78    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
79}
80
81/// @dev Implements `WrappedAnyRequest` for `AnyRequestFuture`.
82///
83/// - `poll_inner` is similar to `Future` polling but intentionally consumes the Future<Output=T>
84///   and return Future<Output=()>
85/// - This design avoids storing `Future<Output = T>` directly, as its type may not be known at
86///   compile time.
87/// - Instead, the result (`Result<T, Err>`) is sent via the `sender` channel, which enforces type
88///   safety.
89impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
90where
91    T: fmt::Debug + Send + 'static,
92    Err: fmt::Debug + Send + 'static,
93{
94    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
95        match self.future.poll_unpin(cx) {
96            Poll::Ready(result) => {
97                let _ = self.sender.send(result);
98                Poll::Ready(())
99            }
100            Poll::Pending => Poll::Pending,
101        }
102    }
103}
104
105/// Request variants that are executed by the provider
106enum ProviderRequest<Err> {
107    Account(AccountFuture<Err>),
108    Storage(StorageFuture<Err>),
109    BlockHash(BlockHashFuture<Err>),
110    FullBlock(FullBlockFuture<Err>),
111    Transaction(TransactionFuture<Err>),
112    AnyRequest(Box<dyn WrappedAnyRequest>),
113}
114
115/// The Request type the Backend listens for
116#[derive(Debug)]
117enum BackendRequest {
118    /// Fetch the account info
119    Basic(Address, AccountInfoSender),
120    /// Fetch a storage slot
121    Storage(Address, U256, StorageSender),
122    /// Fetch a block hash
123    BlockHash(u64, BlockHashSender),
124    /// Fetch an entire block with transactions
125    FullBlock(BlockId, FullBlockSender),
126    /// Fetch a transaction
127    Transaction(B256, TransactionSender),
128    /// Sets the pinned block to fetch data from
129    SetPinnedBlock(BlockId),
130
131    /// Update Address data
132    UpdateAddress(AddressData),
133    /// Update Storage data
134    UpdateStorage(StorageData),
135    /// Update Block Hashes
136    UpdateBlockHash(BlockHashData),
137    /// Any other request
138    AnyRequest(Box<dyn WrappedAnyRequest>),
139}
140
141/// Handles an internal provider and listens for requests.
142///
143/// This handler will remain active as long as it is reachable (request channel still open) and
144/// requests are in progress.
145#[must_use = "futures do nothing unless polled"]
146pub struct BackendHandler<P> {
147    provider: P,
148    /// Stores all the data.
149    db: BlockchainDb,
150    /// Requests currently in progress
151    pending_requests: Vec<ProviderRequest<eyre::Report>>,
152    /// Listeners that wait for a `get_account` related response
153    account_requests: HashMap<Address, Vec<AccountInfoSender>>,
154    /// Listeners that wait for a `get_storage_at` response
155    storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
156    /// Listeners that wait for a `get_block` response
157    block_requests: HashMap<u64, Vec<BlockHashSender>>,
158    /// Incoming commands.
159    incoming: UnboundedReceiver<BackendRequest>,
160    /// unprocessed queued requests
161    queued_requests: VecDeque<BackendRequest>,
162    /// The block to fetch data from.
163    // This is an `Option` so that we can have less code churn in the functions below
164    block_id: Option<BlockId>,
165}
166
167impl<P> BackendHandler<P>
168where
169    P: Provider<AnyNetwork> + Clone + Unpin + 'static,
170{
171    fn new(
172        provider: P,
173        db: BlockchainDb,
174        rx: UnboundedReceiver<BackendRequest>,
175        block_id: Option<BlockId>,
176    ) -> Self {
177        Self {
178            provider,
179            db,
180            pending_requests: Default::default(),
181            account_requests: Default::default(),
182            storage_requests: Default::default(),
183            block_requests: Default::default(),
184            queued_requests: Default::default(),
185            incoming: rx,
186            block_id,
187        }
188    }
189
190    /// handle the request in queue in the future.
191    ///
192    /// We always check:
193    ///  1. if the requested value is already stored in the cache, then answer the sender
194    ///  2. otherwise, fetch it via the provider but check if a request for that value is already in
195    ///     progress (e.g. another Sender just requested the same account)
196    fn on_request(&mut self, req: BackendRequest) {
197        match req {
198            BackendRequest::Basic(addr, sender) => {
199                trace!(target: "backendhandler", "received request basic address={:?}", addr);
200                let acc = self.db.accounts().read().get(&addr).cloned();
201                if let Some(basic) = acc {
202                    let _ = sender.send(Ok(basic));
203                } else {
204                    self.request_account(addr, sender);
205                }
206            }
207            BackendRequest::BlockHash(number, sender) => {
208                let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
209                if let Some(hash) = hash {
210                    let _ = sender.send(Ok(hash));
211                } else {
212                    self.request_hash(number, sender);
213                }
214            }
215            BackendRequest::FullBlock(number, sender) => {
216                self.request_full_block(number, sender);
217            }
218            BackendRequest::Transaction(tx, sender) => {
219                self.request_transaction(tx, sender);
220            }
221            BackendRequest::Storage(addr, idx, sender) => {
222                // account is already stored in the cache
223                let value =
224                    self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
225                if let Some(value) = value {
226                    let _ = sender.send(Ok(value));
227                } else {
228                    // account present but not storage -> fetch storage
229                    self.request_account_storage(addr, idx, sender);
230                }
231            }
232            BackendRequest::SetPinnedBlock(block_id) => {
233                self.block_id = Some(block_id);
234            }
235            BackendRequest::UpdateAddress(address_data) => {
236                for (address, data) in address_data {
237                    self.db.accounts().write().insert(address, data);
238                }
239            }
240            BackendRequest::UpdateStorage(storage_data) => {
241                for (address, data) in storage_data {
242                    self.db.storage().write().insert(address, data);
243                }
244            }
245            BackendRequest::UpdateBlockHash(block_hash_data) => {
246                for (block, hash) in block_hash_data {
247                    self.db.block_hashes().write().insert(block, hash);
248                }
249            }
250            BackendRequest::AnyRequest(fut) => {
251                self.pending_requests.push(ProviderRequest::AnyRequest(fut));
252            }
253        }
254    }
255
256    /// process a request for account's storage
257    fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
258        match self.storage_requests.entry((address, idx)) {
259            Entry::Occupied(mut entry) => {
260                entry.get_mut().push(listener);
261            }
262            Entry::Vacant(entry) => {
263                trace!(target: "backendhandler", %address, %idx, "preparing storage request");
264                entry.insert(vec![listener]);
265                let provider = self.provider.clone();
266                let block_id = self.block_id.unwrap_or_default();
267                let fut = Box::pin(async move {
268                    let storage = provider
269                        .get_storage_at(address, idx)
270                        .block_id(block_id)
271                        .await
272                        .map_err(Into::into);
273                    (storage, address, idx)
274                });
275                self.pending_requests.push(ProviderRequest::Storage(fut));
276            }
277        }
278    }
279
280    /// returns the future that fetches the account data
281    fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
282        trace!(target: "backendhandler", "preparing account request, address={:?}", address);
283        let provider = self.provider.clone();
284        let block_id = self.block_id.unwrap_or_default();
285        let fut = Box::pin(async move {
286            let balance = provider.get_balance(address).block_id(block_id).into_future();
287            let nonce = provider.get_transaction_count(address).block_id(block_id).into_future();
288            let code = provider.get_code_at(address).block_id(block_id).into_future();
289            let resp = tokio::try_join!(balance, nonce, code).map_err(Into::into);
290            (resp, address)
291        });
292        ProviderRequest::Account(fut)
293    }
294
295    /// process a request for an account
296    fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
297        match self.account_requests.entry(address) {
298            Entry::Occupied(mut entry) => {
299                entry.get_mut().push(listener);
300            }
301            Entry::Vacant(entry) => {
302                entry.insert(vec![listener]);
303                self.pending_requests.push(self.get_account_req(address));
304            }
305        }
306    }
307
308    /// process a request for an entire block
309    fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
310        let provider = self.provider.clone();
311        let fut = Box::pin(async move {
312            let block = provider
313                .get_block(number)
314                .full()
315                .await
316                .wrap_err(format!("could not fetch block {number:?}"));
317            (sender, block, number)
318        });
319
320        self.pending_requests.push(ProviderRequest::FullBlock(fut));
321    }
322
323    /// process a request for a transactions
324    fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
325        let provider = self.provider.clone();
326        let fut = Box::pin(async move {
327            let block = provider
328                .get_transaction_by_hash(tx)
329                .await
330                .wrap_err_with(|| format!("could not get transaction {tx}"))
331                .and_then(|maybe| {
332                    maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
333                });
334            (sender, block, tx)
335        });
336
337        self.pending_requests.push(ProviderRequest::Transaction(fut));
338    }
339
340    /// process a request for a block hash
341    fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
342        match self.block_requests.entry(number) {
343            Entry::Occupied(mut entry) => {
344                entry.get_mut().push(listener);
345            }
346            Entry::Vacant(entry) => {
347                trace!(target: "backendhandler", number, "preparing block hash request");
348                entry.insert(vec![listener]);
349                let provider = self.provider.clone();
350                let fut = Box::pin(async move {
351                    let block = provider
352                        .get_block_by_number(number.into())
353                        .hashes()
354                        .await
355                        .wrap_err("failed to get block");
356
357                    let block_hash = match block {
358                        Ok(Some(block)) => Ok(block.header.hash),
359                        Ok(None) => {
360                            warn!(target: "backendhandler", ?number, "block not found");
361                            // if no block was returned then the block does not exist, in which case
362                            // we return empty hash
363                            Ok(KECCAK_EMPTY)
364                        }
365                        Err(err) => {
366                            error!(target: "backendhandler", %err, ?number, "failed to get block");
367                            Err(err)
368                        }
369                    };
370                    (block_hash, number)
371                });
372                self.pending_requests.push(ProviderRequest::BlockHash(fut));
373            }
374        }
375    }
376}
377
378impl<P> Future for BackendHandler<P>
379where
380    P: Provider<AnyNetwork> + Clone + Unpin + 'static,
381{
382    type Output = ();
383
384    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
385        let pin = self.get_mut();
386        loop {
387            // Drain queued requests first.
388            while let Some(req) = pin.queued_requests.pop_front() {
389                pin.on_request(req)
390            }
391
392            // receive new requests to delegate to the underlying provider
393            loop {
394                match Pin::new(&mut pin.incoming).poll_next(cx) {
395                    Poll::Ready(Some(req)) => {
396                        pin.queued_requests.push_back(req);
397                    }
398                    Poll::Ready(None) => {
399                        trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
400                        return Poll::Ready(());
401                    }
402                    Poll::Pending => break,
403                }
404            }
405
406            // poll all requests in progress
407            for n in (0..pin.pending_requests.len()).rev() {
408                let mut request = pin.pending_requests.swap_remove(n);
409                match &mut request {
410                    ProviderRequest::Account(fut) => {
411                        if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
412                            // get the response
413                            let (balance, nonce, code) = match resp {
414                                Ok(res) => res,
415                                Err(err) => {
416                                    let err = Arc::new(err);
417                                    if let Some(listeners) = pin.account_requests.remove(&addr) {
418                                        listeners.into_iter().for_each(|l| {
419                                            let _ = l.send(Err(DatabaseError::GetAccount(
420                                                addr,
421                                                Arc::clone(&err),
422                                            )));
423                                        })
424                                    }
425                                    continue;
426                                }
427                            };
428
429                            // convert it to revm-style types
430                            let (code, code_hash) = if !code.is_empty() {
431                                (code.clone(), keccak256(&code))
432                            } else {
433                                (Bytes::default(), KECCAK_EMPTY)
434                            };
435
436                            // update the cache
437                            let acc = AccountInfo {
438                                nonce,
439                                balance,
440                                code: Some(Bytecode::new_raw(code)),
441                                code_hash,
442                            };
443                            pin.db.accounts().write().insert(addr, acc.clone());
444
445                            // notify all listeners
446                            if let Some(listeners) = pin.account_requests.remove(&addr) {
447                                listeners.into_iter().for_each(|l| {
448                                    let _ = l.send(Ok(acc.clone()));
449                                })
450                            }
451                            continue;
452                        }
453                    }
454                    ProviderRequest::Storage(fut) => {
455                        if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
456                            let value = match resp {
457                                Ok(value) => value,
458                                Err(err) => {
459                                    // notify all listeners
460                                    let err = Arc::new(err);
461                                    if let Some(listeners) =
462                                        pin.storage_requests.remove(&(addr, idx))
463                                    {
464                                        listeners.into_iter().for_each(|l| {
465                                            let _ = l.send(Err(DatabaseError::GetStorage(
466                                                addr,
467                                                idx,
468                                                Arc::clone(&err),
469                                            )));
470                                        })
471                                    }
472                                    continue;
473                                }
474                            };
475
476                            // update the cache
477                            pin.db.storage().write().entry(addr).or_default().insert(idx, value);
478
479                            // notify all listeners
480                            if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
481                                listeners.into_iter().for_each(|l| {
482                                    let _ = l.send(Ok(value));
483                                })
484                            }
485                            continue;
486                        }
487                    }
488                    ProviderRequest::BlockHash(fut) => {
489                        if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
490                            let value = match block_hash {
491                                Ok(value) => value,
492                                Err(err) => {
493                                    let err = Arc::new(err);
494                                    // notify all listeners
495                                    if let Some(listeners) = pin.block_requests.remove(&number) {
496                                        listeners.into_iter().for_each(|l| {
497                                            let _ = l.send(Err(DatabaseError::GetBlockHash(
498                                                number,
499                                                Arc::clone(&err),
500                                            )));
501                                        })
502                                    }
503                                    continue;
504                                }
505                            };
506
507                            // update the cache
508                            pin.db.block_hashes().write().insert(U256::from(number), value);
509
510                            // notify all listeners
511                            if let Some(listeners) = pin.block_requests.remove(&number) {
512                                listeners.into_iter().for_each(|l| {
513                                    let _ = l.send(Ok(value));
514                                })
515                            }
516                            continue;
517                        }
518                    }
519                    ProviderRequest::FullBlock(fut) => {
520                        if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
521                            let msg = match resp {
522                                Ok(Some(block)) => Ok(block),
523                                Ok(None) => Err(DatabaseError::BlockNotFound(number)),
524                                Err(err) => {
525                                    let err = Arc::new(err);
526                                    Err(DatabaseError::GetFullBlock(number, err))
527                                }
528                            };
529                            let _ = sender.send(msg);
530                            continue;
531                        }
532                    }
533                    ProviderRequest::Transaction(fut) => {
534                        if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
535                            let msg = match tx {
536                                Ok(tx) => Ok(tx),
537                                Err(err) => {
538                                    let err = Arc::new(err);
539                                    Err(DatabaseError::GetTransaction(tx_hash, err))
540                                }
541                            };
542                            let _ = sender.send(msg);
543                            continue;
544                        }
545                    }
546                    ProviderRequest::AnyRequest(fut) => {
547                        if fut.poll_inner(cx).is_ready() {
548                            continue;
549                        }
550                    }
551                }
552                // not ready, insert and poll again
553                pin.pending_requests.push(request);
554            }
555
556            // If no new requests have been queued, break to
557            // be polled again later.
558            if pin.queued_requests.is_empty() {
559                return Poll::Pending;
560            }
561        }
562    }
563}
564
565/// Mode for the `SharedBackend` how to block in the non-async [`DatabaseRef`] when interacting with
566/// [`BackendHandler`].
567#[derive(Default, Clone, Debug, PartialEq)]
568pub enum BlockingMode {
569    /// This mode use `tokio::task::block_in_place()` to block in place.
570    ///
571    /// This should be used when blocking on the call site is disallowed.
572    #[default]
573    BlockInPlace,
574    /// The mode blocks the current task
575    ///
576    /// This can be used if blocking on the call site is allowed, e.g. on a tokio blocking task.
577    Block,
578}
579
580impl BlockingMode {
581    /// run process logic with the blocking mode
582    pub fn run<F, R>(&self, f: F) -> R
583    where
584        F: FnOnce() -> R,
585    {
586        match self {
587            Self::BlockInPlace => tokio::task::block_in_place(f),
588            Self::Block => f(),
589        }
590    }
591}
592
593/// A cloneable backend type that shares access to the backend data with all its clones.
594///
595/// This backend type is connected to the `BackendHandler` via a mpsc unbounded channel. The
596/// `BackendHandler` is spawned on a tokio task and listens for incoming commands on the receiver
597/// half of the channel. A `SharedBackend` holds a sender for that channel, which is `Clone`, so
598/// there can be multiple `SharedBackend`s communicating with the same `BackendHandler`, hence this
599/// `Backend` type is thread safe.
600///
601/// All `Backend` trait functions are delegated as a `BackendRequest` via the channel to the
602/// `BackendHandler`. All `BackendRequest` variants include a sender half of an additional channel
603/// that is used by the `BackendHandler` to send the result of an executed `BackendRequest` back to
604/// `SharedBackend`.
605///
606/// The `BackendHandler` holds a `Provider` to look up missing accounts or storage slots
607/// from remote (e.g. infura). It detects duplicate requests from multiple `SharedBackend`s and
608/// bundles them together, so that always only one provider request is executed. For example, there
609/// are two `SharedBackend`s, `A` and `B`, both request the basic account info of account
610/// `0xasd9sa7d...` at the same time. After the `BackendHandler` receives the request from `A`, it
611/// sends a new provider request to the provider's endpoint, then it reads the identical request
612/// from `B` and simply adds it as an additional listener for the request already in progress,
613/// instead of sending another one. So that after the provider returns the response all listeners
614/// (`A` and `B`) get notified.
615// **Note**: the implementation makes use of [tokio::task::block_in_place()] when interacting with
616// the underlying [BackendHandler] which runs on a separate spawned tokio task.
617// [tokio::task::block_in_place()]
618// > Runs the provided blocking function on the current thread without blocking the executor.
619// This prevents issues (hangs) we ran into were the [SharedBackend] itself is called from a spawned
620// task.
621#[derive(Clone, Debug)]
622pub struct SharedBackend {
623    /// channel used for sending commands related to database operations
624    backend: UnboundedSender<BackendRequest>,
625    /// Ensures that the underlying cache gets flushed once the last `SharedBackend` is dropped.
626    ///
627    /// There is only one instance of the type, so as soon as the last `SharedBackend` is deleted,
628    /// `FlushJsonBlockCacheDB` is also deleted and the cache is flushed.
629    cache: Arc<FlushJsonBlockCacheDB>,
630
631    /// The mode for the `SharedBackend` to block in place or not
632    blocking_mode: BlockingMode,
633}
634
635impl SharedBackend {
636    /// _Spawns_ a new `BackendHandler` on a `tokio::task` that listens for requests from any
637    /// `SharedBackend`. Missing values get inserted in the `db`.
638    ///
639    /// The spawned `BackendHandler` finishes once the last `SharedBackend` connected to it is
640    /// dropped.
641    ///
642    /// NOTE: this should be called with `Arc<Provider>`
643    pub async fn spawn_backend<P>(provider: P, db: BlockchainDb, pin_block: Option<BlockId>) -> Self
644    where
645        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
646    {
647        let (shared, handler) = Self::new(provider, db, pin_block);
648        // spawn the provider handler to a task
649        trace!(target: "backendhandler", "spawning Backendhandler task");
650        tokio::spawn(handler);
651        shared
652    }
653
654    /// Same as `Self::spawn_backend` but spawns the `BackendHandler` on a separate `std::thread` in
655    /// its own `tokio::Runtime`
656    pub fn spawn_backend_thread<P>(
657        provider: P,
658        db: BlockchainDb,
659        pin_block: Option<BlockId>,
660    ) -> Self
661    where
662        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
663    {
664        let (shared, handler) = Self::new(provider, db, pin_block);
665
666        // spawn a light-weight thread with a thread-local async runtime just for
667        // sending and receiving data from the remote client
668        std::thread::Builder::new()
669            .name("fork-backend".into())
670            .spawn(move || {
671                let rt = tokio::runtime::Builder::new_current_thread()
672                    .enable_all()
673                    .build()
674                    .expect("failed to build tokio runtime");
675
676                rt.block_on(handler);
677            })
678            .expect("failed to spawn thread");
679        trace!(target: "backendhandler", "spawned Backendhandler thread");
680
681        shared
682    }
683
684    /// Returns a new `SharedBackend` and the `BackendHandler`
685    pub fn new<P>(
686        provider: P,
687        db: BlockchainDb,
688        pin_block: Option<BlockId>,
689    ) -> (Self, BackendHandler<P>)
690    where
691        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
692    {
693        let (backend, backend_rx) = unbounded();
694        let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
695        let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
696        (Self { backend, cache, blocking_mode: Default::default() }, handler)
697    }
698
699    /// Returns a new `SharedBackend` and the `BackendHandler` with a specific blocking mode
700    pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
701        Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
702    }
703
704    /// Updates the pinned block to fetch data from
705    pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
706        let req = BackendRequest::SetPinnedBlock(block.into());
707        self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
708    }
709
710    /// Returns the full block for the given block identifier
711    pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
712        self.blocking_mode.run(|| {
713            let (sender, rx) = oneshot_channel();
714            let req = BackendRequest::FullBlock(block.into(), sender);
715            self.backend.unbounded_send(req)?;
716            rx.recv()?
717        })
718    }
719
720    /// Returns the transaction for the hash
721    pub fn get_transaction(&self, tx: B256) -> DatabaseResult<AnyRpcTransaction> {
722        self.blocking_mode.run(|| {
723            let (sender, rx) = oneshot_channel();
724            let req = BackendRequest::Transaction(tx, sender);
725            self.backend.unbounded_send(req)?;
726            rx.recv()?
727        })
728    }
729
730    fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
731        self.blocking_mode.run(|| {
732            let (sender, rx) = oneshot_channel();
733            let req = BackendRequest::Basic(address, sender);
734            self.backend.unbounded_send(req)?;
735            rx.recv()?.map(Some)
736        })
737    }
738
739    fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
740        self.blocking_mode.run(|| {
741            let (sender, rx) = oneshot_channel();
742            let req = BackendRequest::Storage(address, index, sender);
743            self.backend.unbounded_send(req)?;
744            rx.recv()?
745        })
746    }
747
748    fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
749        self.blocking_mode.run(|| {
750            let (sender, rx) = oneshot_channel();
751            let req = BackendRequest::BlockHash(number, sender);
752            self.backend.unbounded_send(req)?;
753            rx.recv()?
754        })
755    }
756
757    /// Inserts or updates data for multiple addresses
758    pub fn insert_or_update_address(&self, address_data: AddressData) {
759        let req = BackendRequest::UpdateAddress(address_data);
760        let err = self.backend.unbounded_send(req);
761        match err {
762            Ok(_) => (),
763            Err(e) => {
764                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
765            }
766        }
767    }
768
769    /// Inserts or updates data for multiple storage slots
770    pub fn insert_or_update_storage(&self, storage_data: StorageData) {
771        let req = BackendRequest::UpdateStorage(storage_data);
772        let err = self.backend.unbounded_send(req);
773        match err {
774            Ok(_) => (),
775            Err(e) => {
776                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
777            }
778        }
779    }
780
781    /// Inserts or updates data for multiple block hashes
782    pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
783        let req = BackendRequest::UpdateBlockHash(block_hash_data);
784        let err = self.backend.unbounded_send(req);
785        match err {
786            Ok(_) => (),
787            Err(e) => {
788                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
789            }
790        }
791    }
792
793    /// Returns any arbitrary request on the provider
794    pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
795    where
796        F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
797        T: fmt::Debug + Send + 'static,
798    {
799        self.blocking_mode.run(|| {
800            let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
801            let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
802                sender,
803                future: Box::pin(fut),
804            }));
805            self.backend.unbounded_send(req)?;
806            rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
807        })
808    }
809
810    /// Flushes the DB to disk if caching is enabled
811    pub fn flush_cache(&self) {
812        self.cache.0.flush();
813    }
814
815    /// Flushes the DB to a specific file
816    pub fn flush_cache_to(&self, cache_path: &Path) {
817        self.cache.0.flush_to(cache_path);
818    }
819
820    /// Returns the DB
821    pub fn data(&self) -> Arc<MemDb> {
822        self.cache.0.db().clone()
823    }
824
825    /// Returns the DB accounts
826    pub fn accounts(&self) -> AddressData {
827        self.cache.0.db().accounts.read().clone()
828    }
829
830    /// Returns the DB accounts length
831    pub fn accounts_len(&self) -> usize {
832        self.cache.0.db().accounts.read().len()
833    }
834
835    /// Returns the DB storage
836    pub fn storage(&self) -> StorageData {
837        self.cache.0.db().storage.read().clone()
838    }
839
840    /// Returns the DB storage length
841    pub fn storage_len(&self) -> usize {
842        self.cache.0.db().storage.read().len()
843    }
844
845    /// Returns the DB block_hashes
846    pub fn block_hashes(&self) -> BlockHashData {
847        self.cache.0.db().block_hashes.read().clone()
848    }
849
850    /// Returns the DB block_hashes length
851    pub fn block_hashes_len(&self) -> usize {
852        self.cache.0.db().block_hashes.read().len()
853    }
854}
855
856impl DatabaseRef for SharedBackend {
857    type Error = DatabaseError;
858
859    fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
860        trace!(target: "sharedbackend", %address, "request basic");
861        self.do_get_basic(address).map_err(|err| {
862            error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
863            if err.is_possibly_non_archive_node_error() {
864                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
865            }
866            err
867        })
868    }
869
870    fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
871        Err(DatabaseError::MissingCode(hash))
872    }
873
874    fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
875        trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
876        self.do_get_storage(address, index).map_err(|err| {
877            error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
878            if err.is_possibly_non_archive_node_error() {
879                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
880            }
881          err
882        })
883    }
884
885    fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
886        trace!(target: "sharedbackend", "request block hash for number {:?}", number);
887        self.do_get_block_hash(number).map_err(|err| {
888            error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
889            if err.is_possibly_non_archive_node_error() {
890                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
891            }
892            err
893        })
894    }
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900    use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
901    use alloy_provider::ProviderBuilder;
902    use alloy_rpc_client::ClientBuilder;
903    use serde::Deserialize;
904    use std::{collections::BTreeSet, fs, path::PathBuf};
905    use tiny_http::{Response, Server};
906
907    pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
908        ProviderBuilder::new()
909            .network::<AnyNetwork>()
910            .on_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
911    }
912
913    const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
914
915    #[tokio::test(flavor = "multi_thread")]
916    async fn test_builder() {
917        let Some(endpoint) = ENDPOINT else { return };
918        let provider = get_http_provider(endpoint);
919
920        let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
921        let _meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
922    }
923
924    #[tokio::test(flavor = "multi_thread")]
925    async fn shared_backend() {
926        let Some(endpoint) = ENDPOINT else { return };
927
928        let provider = get_http_provider(endpoint);
929        let meta = BlockchainDbMeta {
930            cfg_env: Default::default(),
931            block_env: Default::default(),
932            hosts: BTreeSet::from([endpoint.to_string()]),
933        };
934
935        let db = BlockchainDb::new(meta, None);
936        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
937
938        // some rng contract from etherscan
939        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
940
941        let idx = U256::from(0u64);
942        let value = backend.storage_ref(address, idx).unwrap();
943        let account = backend.basic_ref(address).unwrap().unwrap();
944
945        let mem_acc = db.accounts().read().get(&address).unwrap().clone();
946        assert_eq!(account.balance, mem_acc.balance);
947        assert_eq!(account.nonce, mem_acc.nonce);
948        let slots = db.storage().read().get(&address).unwrap().clone();
949        assert_eq!(slots.len(), 1);
950        assert_eq!(slots.get(&idx).copied().unwrap(), value);
951
952        let num = 10u64;
953        let hash = backend.block_hash_ref(num).unwrap();
954        let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
955        assert_eq!(hash, mem_hash);
956
957        let max_slots = 5;
958        let handle = std::thread::spawn(move || {
959            for i in 1..max_slots {
960                let idx = U256::from(i);
961                let _ = backend.storage_ref(address, idx);
962            }
963        });
964        handle.join().unwrap();
965        let slots = db.storage().read().get(&address).unwrap().clone();
966        assert_eq!(slots.len() as u64, max_slots);
967    }
968
969    #[test]
970    fn can_read_cache() {
971        let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
972        let json = JsonBlockCacheDB::load(cache_path).unwrap();
973        assert!(!json.db().accounts.read().is_empty());
974    }
975
976    #[tokio::test(flavor = "multi_thread")]
977    async fn can_modify_address() {
978        let Some(endpoint) = ENDPOINT else { return };
979
980        let provider = get_http_provider(endpoint);
981        let meta = BlockchainDbMeta {
982            cfg_env: Default::default(),
983            block_env: Default::default(),
984            hosts: BTreeSet::from([endpoint.to_string()]),
985        };
986
987        let db = BlockchainDb::new(meta, None);
988        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
989
990        // some rng contract from etherscan
991        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
992
993        let new_acc = AccountInfo {
994            nonce: 1000u64,
995            balance: U256::from(2000),
996            code: None,
997            code_hash: KECCAK_EMPTY,
998        };
999        let mut account_data = AddressData::default();
1000        account_data.insert(address, new_acc.clone());
1001
1002        backend.insert_or_update_address(account_data);
1003
1004        let max_slots = 5;
1005        let handle = std::thread::spawn(move || {
1006            for i in 1..max_slots {
1007                let idx = U256::from(i);
1008                let result_address = backend.basic_ref(address).unwrap();
1009                match result_address {
1010                    Some(acc) => {
1011                        assert_eq!(
1012                            acc.nonce, new_acc.nonce,
1013                            "The nonce was not changed in instance of index {}",
1014                            idx
1015                        );
1016                        assert_eq!(
1017                            acc.balance, new_acc.balance,
1018                            "The balance was not changed in instance of index {}",
1019                            idx
1020                        );
1021
1022                        // comparing with db
1023                        let db_address = {
1024                            let accounts = db.accounts().read();
1025                            accounts.get(&address).unwrap().clone()
1026                        };
1027
1028                        assert_eq!(
1029                            db_address.nonce, new_acc.nonce,
1030                            "The nonce was not changed in instance of index {}",
1031                            idx
1032                        );
1033                        assert_eq!(
1034                            db_address.balance, new_acc.balance,
1035                            "The balance was not changed in instance of index {}",
1036                            idx
1037                        );
1038                    }
1039                    None => panic!("Account not found"),
1040                }
1041            }
1042        });
1043        handle.join().unwrap();
1044    }
1045
1046    #[tokio::test(flavor = "multi_thread")]
1047    async fn can_modify_storage() {
1048        let Some(endpoint) = ENDPOINT else { return };
1049
1050        let provider = get_http_provider(endpoint);
1051        let meta = BlockchainDbMeta {
1052            cfg_env: Default::default(),
1053            block_env: Default::default(),
1054            hosts: BTreeSet::from([endpoint.to_string()]),
1055        };
1056
1057        let db = BlockchainDb::new(meta, None);
1058        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1059
1060        // some rng contract from etherscan
1061        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1062
1063        let mut storage_data = StorageData::default();
1064        let mut storage_info = StorageInfo::default();
1065        storage_info.insert(U256::from(20), U256::from(10));
1066        storage_info.insert(U256::from(30), U256::from(15));
1067        storage_info.insert(U256::from(40), U256::from(20));
1068
1069        storage_data.insert(address, storage_info);
1070
1071        backend.insert_or_update_storage(storage_data.clone());
1072
1073        let max_slots = 5;
1074        let handle = std::thread::spawn(move || {
1075            for _ in 1..max_slots {
1076                for (address, info) in &storage_data {
1077                    for (index, value) in info {
1078                        let result_storage = backend.do_get_storage(*address, *index);
1079                        match result_storage {
1080                            Ok(stg_db) => {
1081                                assert_eq!(
1082                                    stg_db, *value,
1083                                    "Storage in slot number {} in address {} do not have the same value", index, address
1084                                );
1085
1086                                let db_result = {
1087                                    let storage = db.storage().read();
1088                                    let address_storage = storage.get(address).unwrap();
1089                                    *address_storage.get(index).unwrap()
1090                                };
1091
1092                                assert_eq!(
1093                                    stg_db, db_result,
1094                                    "Storage in slot number {} in address {} do not have the same value", index, address
1095                                )
1096                            }
1097
1098                            Err(err) => {
1099                                panic!("There was a database error: {}", err)
1100                            }
1101                        }
1102                    }
1103                }
1104            }
1105        });
1106        handle.join().unwrap();
1107    }
1108
1109    #[tokio::test(flavor = "multi_thread")]
1110    async fn can_modify_block_hashes() {
1111        let Some(endpoint) = ENDPOINT else { return };
1112
1113        let provider = get_http_provider(endpoint);
1114        let meta = BlockchainDbMeta {
1115            cfg_env: Default::default(),
1116            block_env: Default::default(),
1117            hosts: BTreeSet::from([endpoint.to_string()]),
1118        };
1119
1120        let db = BlockchainDb::new(meta, None);
1121        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1122
1123        // some rng contract from etherscan
1124        // let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1125
1126        let mut block_hash_data = BlockHashData::default();
1127        block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1128        block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1129        block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1130        block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1131        block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1132
1133        backend.insert_or_update_block_hashes(block_hash_data.clone());
1134
1135        let max_slots: u64 = 5;
1136        let handle = std::thread::spawn(move || {
1137            for i in 1..max_slots {
1138                let key = U256::from(i);
1139                let result_hash = backend.do_get_block_hash(i);
1140                match result_hash {
1141                    Ok(hash) => {
1142                        assert_eq!(
1143                            hash,
1144                            *block_hash_data.get(&key).unwrap(),
1145                            "The hash in block {} did not match",
1146                            key
1147                        );
1148
1149                        let db_result = {
1150                            let hashes = db.block_hashes().read();
1151                            *hashes.get(&key).unwrap()
1152                        };
1153
1154                        assert_eq!(hash, db_result, "The hash in block {} did not match", key);
1155                    }
1156                    Err(err) => panic!("Hash not found, error: {}", err),
1157                }
1158            }
1159        });
1160        handle.join().unwrap();
1161    }
1162
1163    #[tokio::test(flavor = "multi_thread")]
1164    async fn can_modify_storage_with_cache() {
1165        let Some(endpoint) = ENDPOINT else { return };
1166
1167        let provider = get_http_provider(endpoint);
1168        let meta = BlockchainDbMeta {
1169            cfg_env: Default::default(),
1170            block_env: Default::default(),
1171            hosts: BTreeSet::from([endpoint.to_string()]),
1172        };
1173
1174        // create a temporary file
1175        fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1176
1177        let cache_path =
1178            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1179
1180        let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1181        let backend =
1182            SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1183
1184        // some rng contract from etherscan
1185        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1186
1187        let mut storage_data = StorageData::default();
1188        let mut storage_info = StorageInfo::default();
1189        storage_info.insert(U256::from(1), U256::from(10));
1190        storage_info.insert(U256::from(2), U256::from(15));
1191        storage_info.insert(U256::from(3), U256::from(20));
1192        storage_info.insert(U256::from(4), U256::from(20));
1193        storage_info.insert(U256::from(5), U256::from(15));
1194        storage_info.insert(U256::from(6), U256::from(10));
1195
1196        let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1197        address_data.code = None;
1198
1199        storage_data.insert(address, storage_info);
1200
1201        backend.insert_or_update_storage(storage_data.clone());
1202
1203        let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1204        // nullify the code
1205        new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1206
1207        let mut account_data = AddressData::default();
1208        account_data.insert(address, new_acc.clone());
1209
1210        backend.insert_or_update_address(account_data);
1211
1212        let backend_clone = backend.clone();
1213
1214        let max_slots = 5;
1215        let handle = std::thread::spawn(move || {
1216            for _ in 1..max_slots {
1217                for (address, info) in &storage_data {
1218                    for (index, value) in info {
1219                        let result_storage = backend.do_get_storage(*address, *index);
1220                        match result_storage {
1221                            Ok(stg_db) => {
1222                                assert_eq!(
1223                                    stg_db, *value,
1224                                    "Storage in slot number {} in address {} doesn't have the same value", index, address
1225                                );
1226
1227                                let db_result = {
1228                                    let storage = db.storage().read();
1229                                    let address_storage = storage.get(address).unwrap();
1230                                    *address_storage.get(index).unwrap()
1231                                };
1232
1233                                assert_eq!(
1234                                    stg_db, db_result,
1235                                    "Storage in slot number {} in address {} doesn't have the same value", index, address
1236                                );
1237                            }
1238
1239                            Err(err) => {
1240                                panic!("There was a database error: {}", err)
1241                            }
1242                        }
1243                    }
1244                }
1245            }
1246
1247            backend_clone.flush_cache();
1248        });
1249        handle.join().unwrap();
1250
1251        // read json and confirm the changes to the data
1252
1253        let cache_path =
1254            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1255
1256        let json_db = BlockchainDb::new(meta, Some(cache_path));
1257
1258        let mut storage_data = StorageData::default();
1259        let mut storage_info = StorageInfo::default();
1260        storage_info.insert(U256::from(1), U256::from(10));
1261        storage_info.insert(U256::from(2), U256::from(15));
1262        storage_info.insert(U256::from(3), U256::from(20));
1263        storage_info.insert(U256::from(4), U256::from(20));
1264        storage_info.insert(U256::from(5), U256::from(15));
1265        storage_info.insert(U256::from(6), U256::from(10));
1266
1267        storage_data.insert(address, storage_info);
1268
1269        // redo the checks with the data extracted from the json file
1270        let max_slots = 5;
1271        let handle = std::thread::spawn(move || {
1272            for _ in 1..max_slots {
1273                for (address, info) in &storage_data {
1274                    for (index, value) in info {
1275                        let result_storage = {
1276                            let storage = json_db.storage().read();
1277                            let address_storage = storage.get(address).unwrap().clone();
1278                            *address_storage.get(index).unwrap()
1279                        };
1280
1281                        assert_eq!(
1282                            result_storage, *value,
1283                            "Storage in slot number {} in address {} doesn't have the same value",
1284                            index, address
1285                        );
1286                    }
1287                }
1288            }
1289        });
1290
1291        handle.join().unwrap();
1292
1293        // erase the temporary file
1294        fs::remove_file("test-data/storage-tmp.json").unwrap();
1295    }
1296
1297    #[tokio::test(flavor = "multi_thread")]
1298    async fn shared_backend_any_request() {
1299        let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1300        let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1301        let endpoint = format!("http://{}", server.server_addr());
1302
1303        // Spin an in-memory server that responds to "foo_callCustomMethod" rpc call.
1304        let expected_bytes_innner = expected_response_bytes.clone();
1305        let server_handle = std::thread::spawn(move || {
1306            #[derive(Debug, Deserialize)]
1307            struct Request {
1308                method: String,
1309            }
1310            let mut request = server.recv().unwrap();
1311            let rpc_request: Request =
1312                serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1313
1314            match rpc_request.method.as_str() {
1315                "foo_callCustomMethod" => request
1316                    .respond(Response::from_string(format!(
1317                        r#"{{"result": "{}"}}"#,
1318                        alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1319                    )))
1320                    .unwrap(),
1321                _ => request
1322                    .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1323                    .unwrap(),
1324            };
1325        });
1326
1327        let provider = get_http_provider(&endpoint);
1328        let meta = BlockchainDbMeta {
1329            cfg_env: Default::default(),
1330            block_env: Default::default(),
1331            hosts: BTreeSet::from([endpoint.to_string()]),
1332        };
1333
1334        let db = BlockchainDb::new(meta, None);
1335        let provider_inner = provider.clone();
1336        let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1337
1338        let actual_response_bytes = backend
1339            .do_any_request(async move {
1340                let bytes: alloy_primitives::Bytes =
1341                    provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1342                Ok(bytes)
1343            })
1344            .expect("failed performing any request");
1345
1346        assert_eq!(actual_response_bytes, expected_response_bytes);
1347
1348        server_handle.join().unwrap();
1349    }
1350}