foundry_fork_db/
backend.rs

1//! Smart caching and deduplication of requests when using a forking provider.
2
3use crate::{
4    cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5    error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9    network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction},
10    Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15    channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16    stream::Stream,
17    task::{Context, Poll},
18    Future, FutureExt,
19};
20use revm::{
21    database::DatabaseRef,
22    primitives::{
23        map::{hash_map::Entry, AddressHashMap, HashMap},
24        KECCAK_EMPTY,
25    },
26    state::{AccountInfo, Bytecode},
27};
28use std::{
29    collections::VecDeque,
30    fmt,
31    future::IntoFuture,
32    path::Path,
33    pin::Pin,
34    sync::{
35        mpsc::{channel as oneshot_channel, Sender as OneshotSender},
36        Arc,
37    },
38};
39
40/// Logged when an error is indicative that the user is trying to fork from a non-archive node.
41pub const NON_ARCHIVE_NODE_WARNING: &str = "\
42It looks like you're trying to fork from an older block with a non-archive node which is not \
43supported. Please try to change your RPC url to an archive node if the issue persists.";
44
45// Various future/request type aliases
46
47type AccountFuture<Err> =
48    Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
49type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
50type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
51type FullBlockFuture<Err> = Pin<
52    Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
53>;
54type TransactionFuture<Err> =
55    Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
56
57type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
58type StorageSender = OneshotSender<DatabaseResult<U256>>;
59type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
60type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
61type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
62
63type AddressData = AddressHashMap<AccountInfo>;
64type StorageData = AddressHashMap<StorageInfo>;
65type BlockHashData = HashMap<U256, B256>;
66
67struct AnyRequestFuture<T, Err> {
68    sender: OneshotSender<Result<T, Err>>,
69    future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
70}
71
72impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
75    }
76}
77
78trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
79    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
80}
81
82/// @dev Implements `WrappedAnyRequest` for `AnyRequestFuture`.
83///
84/// - `poll_inner` is similar to `Future` polling but intentionally consumes the Future<Output=T>
85///   and return Future<Output=()>
86/// - This design avoids storing `Future<Output = T>` directly, as its type may not be known at
87///   compile time.
88/// - Instead, the result (`Result<T, Err>`) is sent via the `sender` channel, which enforces type
89///   safety.
90impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
91where
92    T: fmt::Debug + Send + 'static,
93    Err: fmt::Debug + Send + 'static,
94{
95    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
96        match self.future.poll_unpin(cx) {
97            Poll::Ready(result) => {
98                let _ = self.sender.send(result);
99                Poll::Ready(())
100            }
101            Poll::Pending => Poll::Pending,
102        }
103    }
104}
105
106/// Request variants that are executed by the provider
107enum ProviderRequest<Err> {
108    Account(AccountFuture<Err>),
109    Storage(StorageFuture<Err>),
110    BlockHash(BlockHashFuture<Err>),
111    FullBlock(FullBlockFuture<Err>),
112    Transaction(TransactionFuture<Err>),
113    AnyRequest(Box<dyn WrappedAnyRequest>),
114}
115
116/// The Request type the Backend listens for
117#[derive(Debug)]
118enum BackendRequest {
119    /// Fetch the account info
120    Basic(Address, AccountInfoSender),
121    /// Fetch a storage slot
122    Storage(Address, U256, StorageSender),
123    /// Fetch a block hash
124    BlockHash(u64, BlockHashSender),
125    /// Fetch an entire block with transactions
126    FullBlock(BlockId, FullBlockSender),
127    /// Fetch a transaction
128    Transaction(B256, TransactionSender),
129    /// Sets the pinned block to fetch data from
130    SetPinnedBlock(BlockId),
131
132    /// Update Address data
133    UpdateAddress(AddressData),
134    /// Update Storage data
135    UpdateStorage(StorageData),
136    /// Update Block Hashes
137    UpdateBlockHash(BlockHashData),
138    /// Any other request
139    AnyRequest(Box<dyn WrappedAnyRequest>),
140}
141
142/// Handles an internal provider and listens for requests.
143///
144/// This handler will remain active as long as it is reachable (request channel still open) and
145/// requests are in progress.
146#[must_use = "futures do nothing unless polled"]
147pub struct BackendHandler<P> {
148    provider: P,
149    /// Stores all the data.
150    db: BlockchainDb,
151    /// Requests currently in progress
152    pending_requests: Vec<ProviderRequest<eyre::Report>>,
153    /// Listeners that wait for a `get_account` related response
154    account_requests: HashMap<Address, Vec<AccountInfoSender>>,
155    /// Listeners that wait for a `get_storage_at` response
156    storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
157    /// Listeners that wait for a `get_block` response
158    block_requests: HashMap<u64, Vec<BlockHashSender>>,
159    /// Incoming commands.
160    incoming: UnboundedReceiver<BackendRequest>,
161    /// unprocessed queued requests
162    queued_requests: VecDeque<BackendRequest>,
163    /// The block to fetch data from.
164    // This is an `Option` so that we can have less code churn in the functions below
165    block_id: Option<BlockId>,
166}
167
168impl<P> BackendHandler<P>
169where
170    P: Provider<AnyNetwork> + Clone + Unpin + 'static,
171{
172    fn new(
173        provider: P,
174        db: BlockchainDb,
175        rx: UnboundedReceiver<BackendRequest>,
176        block_id: Option<BlockId>,
177    ) -> Self {
178        Self {
179            provider,
180            db,
181            pending_requests: Default::default(),
182            account_requests: Default::default(),
183            storage_requests: Default::default(),
184            block_requests: Default::default(),
185            queued_requests: Default::default(),
186            incoming: rx,
187            block_id,
188        }
189    }
190
191    /// handle the request in queue in the future.
192    ///
193    /// We always check:
194    ///  1. if the requested value is already stored in the cache, then answer the sender
195    ///  2. otherwise, fetch it via the provider but check if a request for that value is already in
196    ///     progress (e.g. another Sender just requested the same account)
197    fn on_request(&mut self, req: BackendRequest) {
198        match req {
199            BackendRequest::Basic(addr, sender) => {
200                trace!(target: "backendhandler", "received request basic address={:?}", addr);
201                let acc = self.db.accounts().read().get(&addr).cloned();
202                if let Some(basic) = acc {
203                    let _ = sender.send(Ok(basic));
204                } else {
205                    self.request_account(addr, sender);
206                }
207            }
208            BackendRequest::BlockHash(number, sender) => {
209                let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
210                if let Some(hash) = hash {
211                    let _ = sender.send(Ok(hash));
212                } else {
213                    self.request_hash(number, sender);
214                }
215            }
216            BackendRequest::FullBlock(number, sender) => {
217                self.request_full_block(number, sender);
218            }
219            BackendRequest::Transaction(tx, sender) => {
220                self.request_transaction(tx, sender);
221            }
222            BackendRequest::Storage(addr, idx, sender) => {
223                // account is already stored in the cache
224                let value =
225                    self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
226                if let Some(value) = value {
227                    let _ = sender.send(Ok(value));
228                } else {
229                    // account present but not storage -> fetch storage
230                    self.request_account_storage(addr, idx, sender);
231                }
232            }
233            BackendRequest::SetPinnedBlock(block_id) => {
234                self.block_id = Some(block_id);
235            }
236            BackendRequest::UpdateAddress(address_data) => {
237                for (address, data) in address_data {
238                    self.db.accounts().write().insert(address, data);
239                }
240            }
241            BackendRequest::UpdateStorage(storage_data) => {
242                for (address, data) in storage_data {
243                    self.db.storage().write().insert(address, data);
244                }
245            }
246            BackendRequest::UpdateBlockHash(block_hash_data) => {
247                for (block, hash) in block_hash_data {
248                    self.db.block_hashes().write().insert(block, hash);
249                }
250            }
251            BackendRequest::AnyRequest(fut) => {
252                self.pending_requests.push(ProviderRequest::AnyRequest(fut));
253            }
254        }
255    }
256
257    /// process a request for account's storage
258    fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
259        match self.storage_requests.entry((address, idx)) {
260            Entry::Occupied(mut entry) => {
261                entry.get_mut().push(listener);
262            }
263            Entry::Vacant(entry) => {
264                trace!(target: "backendhandler", %address, %idx, "preparing storage request");
265                entry.insert(vec![listener]);
266                let provider = self.provider.clone();
267                let block_id = self.block_id.unwrap_or_default();
268                let fut = Box::pin(async move {
269                    let storage = provider
270                        .get_storage_at(address, idx)
271                        .block_id(block_id)
272                        .await
273                        .map_err(Into::into);
274                    (storage, address, idx)
275                });
276                self.pending_requests.push(ProviderRequest::Storage(fut));
277            }
278        }
279    }
280
281    /// returns the future that fetches the account data
282    fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
283        trace!(target: "backendhandler", "preparing account request, address={:?}", address);
284        let provider = self.provider.clone();
285        let block_id = self.block_id.unwrap_or_default();
286        let fut = Box::pin(async move {
287            let balance = provider.get_balance(address).block_id(block_id).into_future();
288            let nonce = provider.get_transaction_count(address).block_id(block_id).into_future();
289            let code = provider.get_code_at(address).block_id(block_id).into_future();
290            let resp = tokio::try_join!(balance, nonce, code).map_err(Into::into);
291            (resp, address)
292        });
293        ProviderRequest::Account(fut)
294    }
295
296    /// process a request for an account
297    fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
298        match self.account_requests.entry(address) {
299            Entry::Occupied(mut entry) => {
300                entry.get_mut().push(listener);
301            }
302            Entry::Vacant(entry) => {
303                entry.insert(vec![listener]);
304                self.pending_requests.push(self.get_account_req(address));
305            }
306        }
307    }
308
309    /// process a request for an entire block
310    fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
311        let provider = self.provider.clone();
312        let fut = Box::pin(async move {
313            let block = provider
314                .get_block(number)
315                .full()
316                .await
317                .wrap_err(format!("could not fetch block {number:?}"));
318            (sender, block, number)
319        });
320
321        self.pending_requests.push(ProviderRequest::FullBlock(fut));
322    }
323
324    /// process a request for a transactions
325    fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
326        let provider = self.provider.clone();
327        let fut = Box::pin(async move {
328            let block = provider
329                .get_transaction_by_hash(tx)
330                .await
331                .wrap_err_with(|| format!("could not get transaction {tx}"))
332                .and_then(|maybe| {
333                    maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
334                });
335            (sender, block, tx)
336        });
337
338        self.pending_requests.push(ProviderRequest::Transaction(fut));
339    }
340
341    /// process a request for a block hash
342    fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
343        match self.block_requests.entry(number) {
344            Entry::Occupied(mut entry) => {
345                entry.get_mut().push(listener);
346            }
347            Entry::Vacant(entry) => {
348                trace!(target: "backendhandler", number, "preparing block hash request");
349                entry.insert(vec![listener]);
350                let provider = self.provider.clone();
351                let fut = Box::pin(async move {
352                    let block = provider
353                        .get_block_by_number(number.into())
354                        .hashes()
355                        .await
356                        .wrap_err("failed to get block");
357
358                    let block_hash = match block {
359                        Ok(Some(block)) => Ok(block.header.hash),
360                        Ok(None) => {
361                            warn!(target: "backendhandler", ?number, "block not found");
362                            // if no block was returned then the block does not exist, in which case
363                            // we return empty hash
364                            Ok(KECCAK_EMPTY)
365                        }
366                        Err(err) => {
367                            error!(target: "backendhandler", %err, ?number, "failed to get block");
368                            Err(err)
369                        }
370                    };
371                    (block_hash, number)
372                });
373                self.pending_requests.push(ProviderRequest::BlockHash(fut));
374            }
375        }
376    }
377}
378
379impl<P> Future for BackendHandler<P>
380where
381    P: Provider<AnyNetwork> + Clone + Unpin + 'static,
382{
383    type Output = ();
384
385    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
386        let pin = self.get_mut();
387        loop {
388            // Drain queued requests first.
389            while let Some(req) = pin.queued_requests.pop_front() {
390                pin.on_request(req)
391            }
392
393            // receive new requests to delegate to the underlying provider
394            loop {
395                match Pin::new(&mut pin.incoming).poll_next(cx) {
396                    Poll::Ready(Some(req)) => {
397                        pin.queued_requests.push_back(req);
398                    }
399                    Poll::Ready(None) => {
400                        trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
401                        return Poll::Ready(());
402                    }
403                    Poll::Pending => break,
404                }
405            }
406
407            // poll all requests in progress
408            for n in (0..pin.pending_requests.len()).rev() {
409                let mut request = pin.pending_requests.swap_remove(n);
410                match &mut request {
411                    ProviderRequest::Account(fut) => {
412                        if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
413                            // get the response
414                            let (balance, nonce, code) = match resp {
415                                Ok(res) => res,
416                                Err(err) => {
417                                    let err = Arc::new(err);
418                                    if let Some(listeners) = pin.account_requests.remove(&addr) {
419                                        listeners.into_iter().for_each(|l| {
420                                            let _ = l.send(Err(DatabaseError::GetAccount(
421                                                addr,
422                                                Arc::clone(&err),
423                                            )));
424                                        })
425                                    }
426                                    continue;
427                                }
428                            };
429
430                            // convert it to revm-style types
431                            let (code, code_hash) = if !code.is_empty() {
432                                (code.clone(), keccak256(&code))
433                            } else {
434                                (Bytes::default(), KECCAK_EMPTY)
435                            };
436
437                            // update the cache
438                            let acc = AccountInfo {
439                                nonce,
440                                balance,
441                                code: Some(Bytecode::new_raw(code)),
442                                code_hash,
443                            };
444                            pin.db.accounts().write().insert(addr, acc.clone());
445
446                            // notify all listeners
447                            if let Some(listeners) = pin.account_requests.remove(&addr) {
448                                listeners.into_iter().for_each(|l| {
449                                    let _ = l.send(Ok(acc.clone()));
450                                })
451                            }
452                            continue;
453                        }
454                    }
455                    ProviderRequest::Storage(fut) => {
456                        if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
457                            let value = match resp {
458                                Ok(value) => value,
459                                Err(err) => {
460                                    // notify all listeners
461                                    let err = Arc::new(err);
462                                    if let Some(listeners) =
463                                        pin.storage_requests.remove(&(addr, idx))
464                                    {
465                                        listeners.into_iter().for_each(|l| {
466                                            let _ = l.send(Err(DatabaseError::GetStorage(
467                                                addr,
468                                                idx,
469                                                Arc::clone(&err),
470                                            )));
471                                        })
472                                    }
473                                    continue;
474                                }
475                            };
476
477                            // update the cache
478                            pin.db.storage().write().entry(addr).or_default().insert(idx, value);
479
480                            // notify all listeners
481                            if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
482                                listeners.into_iter().for_each(|l| {
483                                    let _ = l.send(Ok(value));
484                                })
485                            }
486                            continue;
487                        }
488                    }
489                    ProviderRequest::BlockHash(fut) => {
490                        if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
491                            let value = match block_hash {
492                                Ok(value) => value,
493                                Err(err) => {
494                                    let err = Arc::new(err);
495                                    // notify all listeners
496                                    if let Some(listeners) = pin.block_requests.remove(&number) {
497                                        listeners.into_iter().for_each(|l| {
498                                            let _ = l.send(Err(DatabaseError::GetBlockHash(
499                                                number,
500                                                Arc::clone(&err),
501                                            )));
502                                        })
503                                    }
504                                    continue;
505                                }
506                            };
507
508                            // update the cache
509                            pin.db.block_hashes().write().insert(U256::from(number), value);
510
511                            // notify all listeners
512                            if let Some(listeners) = pin.block_requests.remove(&number) {
513                                listeners.into_iter().for_each(|l| {
514                                    let _ = l.send(Ok(value));
515                                })
516                            }
517                            continue;
518                        }
519                    }
520                    ProviderRequest::FullBlock(fut) => {
521                        if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
522                            let msg = match resp {
523                                Ok(Some(block)) => Ok(block),
524                                Ok(None) => Err(DatabaseError::BlockNotFound(number)),
525                                Err(err) => {
526                                    let err = Arc::new(err);
527                                    Err(DatabaseError::GetFullBlock(number, err))
528                                }
529                            };
530                            let _ = sender.send(msg);
531                            continue;
532                        }
533                    }
534                    ProviderRequest::Transaction(fut) => {
535                        if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
536                            let msg = match tx {
537                                Ok(tx) => Ok(tx),
538                                Err(err) => {
539                                    let err = Arc::new(err);
540                                    Err(DatabaseError::GetTransaction(tx_hash, err))
541                                }
542                            };
543                            let _ = sender.send(msg);
544                            continue;
545                        }
546                    }
547                    ProviderRequest::AnyRequest(fut) => {
548                        if fut.poll_inner(cx).is_ready() {
549                            continue;
550                        }
551                    }
552                }
553                // not ready, insert and poll again
554                pin.pending_requests.push(request);
555            }
556
557            // If no new requests have been queued, break to
558            // be polled again later.
559            if pin.queued_requests.is_empty() {
560                return Poll::Pending;
561            }
562        }
563    }
564}
565
566/// Mode for the `SharedBackend` how to block in the non-async [`DatabaseRef`] when interacting with
567/// [`BackendHandler`].
568#[derive(Default, Clone, Debug, PartialEq)]
569pub enum BlockingMode {
570    /// This mode use `tokio::task::block_in_place()` to block in place.
571    ///
572    /// This should be used when blocking on the call site is disallowed.
573    #[default]
574    BlockInPlace,
575    /// The mode blocks the current task
576    ///
577    /// This can be used if blocking on the call site is allowed, e.g. on a tokio blocking task.
578    Block,
579}
580
581impl BlockingMode {
582    /// run process logic with the blocking mode
583    pub fn run<F, R>(&self, f: F) -> R
584    where
585        F: FnOnce() -> R,
586    {
587        match self {
588            Self::BlockInPlace => tokio::task::block_in_place(f),
589            Self::Block => f(),
590        }
591    }
592}
593
594/// A cloneable backend type that shares access to the backend data with all its clones.
595///
596/// This backend type is connected to the `BackendHandler` via a mpsc unbounded channel. The
597/// `BackendHandler` is spawned on a tokio task and listens for incoming commands on the receiver
598/// half of the channel. A `SharedBackend` holds a sender for that channel, which is `Clone`, so
599/// there can be multiple `SharedBackend`s communicating with the same `BackendHandler`, hence this
600/// `Backend` type is thread safe.
601///
602/// All `Backend` trait functions are delegated as a `BackendRequest` via the channel to the
603/// `BackendHandler`. All `BackendRequest` variants include a sender half of an additional channel
604/// that is used by the `BackendHandler` to send the result of an executed `BackendRequest` back to
605/// `SharedBackend`.
606///
607/// The `BackendHandler` holds a `Provider` to look up missing accounts or storage slots
608/// from remote (e.g. infura). It detects duplicate requests from multiple `SharedBackend`s and
609/// bundles them together, so that always only one provider request is executed. For example, there
610/// are two `SharedBackend`s, `A` and `B`, both request the basic account info of account
611/// `0xasd9sa7d...` at the same time. After the `BackendHandler` receives the request from `A`, it
612/// sends a new provider request to the provider's endpoint, then it reads the identical request
613/// from `B` and simply adds it as an additional listener for the request already in progress,
614/// instead of sending another one. So that after the provider returns the response all listeners
615/// (`A` and `B`) get notified.
616// **Note**: the implementation makes use of [tokio::task::block_in_place()] when interacting with
617// the underlying [BackendHandler] which runs on a separate spawned tokio task.
618// [tokio::task::block_in_place()]
619// > Runs the provided blocking function on the current thread without blocking the executor.
620// This prevents issues (hangs) we ran into were the [SharedBackend] itself is called from a spawned
621// task.
622#[derive(Clone, Debug)]
623pub struct SharedBackend {
624    /// channel used for sending commands related to database operations
625    backend: UnboundedSender<BackendRequest>,
626    /// Ensures that the underlying cache gets flushed once the last `SharedBackend` is dropped.
627    ///
628    /// There is only one instance of the type, so as soon as the last `SharedBackend` is deleted,
629    /// `FlushJsonBlockCacheDB` is also deleted and the cache is flushed.
630    cache: Arc<FlushJsonBlockCacheDB>,
631
632    /// The mode for the `SharedBackend` to block in place or not
633    blocking_mode: BlockingMode,
634}
635
636impl SharedBackend {
637    /// _Spawns_ a new `BackendHandler` on a `tokio::task` that listens for requests from any
638    /// `SharedBackend`. Missing values get inserted in the `db`.
639    ///
640    /// The spawned `BackendHandler` finishes once the last `SharedBackend` connected to it is
641    /// dropped.
642    ///
643    /// NOTE: this should be called with `Arc<Provider>`
644    pub async fn spawn_backend<P>(provider: P, db: BlockchainDb, pin_block: Option<BlockId>) -> Self
645    where
646        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
647    {
648        let (shared, handler) = Self::new(provider, db, pin_block);
649        // spawn the provider handler to a task
650        trace!(target: "backendhandler", "spawning Backendhandler task");
651        tokio::spawn(handler);
652        shared
653    }
654
655    /// Same as `Self::spawn_backend` but spawns the `BackendHandler` on a separate `std::thread` in
656    /// its own `tokio::Runtime`
657    pub fn spawn_backend_thread<P>(
658        provider: P,
659        db: BlockchainDb,
660        pin_block: Option<BlockId>,
661    ) -> Self
662    where
663        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
664    {
665        let (shared, handler) = Self::new(provider, db, pin_block);
666
667        // spawn a light-weight thread with a thread-local async runtime just for
668        // sending and receiving data from the remote client
669        std::thread::Builder::new()
670            .name("fork-backend".into())
671            .spawn(move || {
672                let rt = tokio::runtime::Builder::new_current_thread()
673                    .enable_all()
674                    .build()
675                    .expect("failed to build tokio runtime");
676
677                rt.block_on(handler);
678            })
679            .expect("failed to spawn thread");
680        trace!(target: "backendhandler", "spawned Backendhandler thread");
681
682        shared
683    }
684
685    /// Returns a new `SharedBackend` and the `BackendHandler`
686    pub fn new<P>(
687        provider: P,
688        db: BlockchainDb,
689        pin_block: Option<BlockId>,
690    ) -> (Self, BackendHandler<P>)
691    where
692        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
693    {
694        let (backend, backend_rx) = unbounded();
695        let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
696        let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
697        (Self { backend, cache, blocking_mode: Default::default() }, handler)
698    }
699
700    /// Returns a new `SharedBackend` and the `BackendHandler` with a specific blocking mode
701    pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
702        Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
703    }
704
705    /// Updates the pinned block to fetch data from
706    pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
707        let req = BackendRequest::SetPinnedBlock(block.into());
708        self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
709    }
710
711    /// Returns the full block for the given block identifier
712    pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
713        self.blocking_mode.run(|| {
714            let (sender, rx) = oneshot_channel();
715            let req = BackendRequest::FullBlock(block.into(), sender);
716            self.backend.unbounded_send(req)?;
717            rx.recv()?
718        })
719    }
720
721    /// Returns the transaction for the hash
722    pub fn get_transaction(&self, tx: B256) -> DatabaseResult<AnyRpcTransaction> {
723        self.blocking_mode.run(|| {
724            let (sender, rx) = oneshot_channel();
725            let req = BackendRequest::Transaction(tx, sender);
726            self.backend.unbounded_send(req)?;
727            rx.recv()?
728        })
729    }
730
731    fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
732        self.blocking_mode.run(|| {
733            let (sender, rx) = oneshot_channel();
734            let req = BackendRequest::Basic(address, sender);
735            self.backend.unbounded_send(req)?;
736            rx.recv()?.map(Some)
737        })
738    }
739
740    fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
741        self.blocking_mode.run(|| {
742            let (sender, rx) = oneshot_channel();
743            let req = BackendRequest::Storage(address, index, sender);
744            self.backend.unbounded_send(req)?;
745            rx.recv()?
746        })
747    }
748
749    fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
750        self.blocking_mode.run(|| {
751            let (sender, rx) = oneshot_channel();
752            let req = BackendRequest::BlockHash(number, sender);
753            self.backend.unbounded_send(req)?;
754            rx.recv()?
755        })
756    }
757
758    /// Inserts or updates data for multiple addresses
759    pub fn insert_or_update_address(&self, address_data: AddressData) {
760        let req = BackendRequest::UpdateAddress(address_data);
761        let err = self.backend.unbounded_send(req);
762        match err {
763            Ok(_) => (),
764            Err(e) => {
765                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
766            }
767        }
768    }
769
770    /// Inserts or updates data for multiple storage slots
771    pub fn insert_or_update_storage(&self, storage_data: StorageData) {
772        let req = BackendRequest::UpdateStorage(storage_data);
773        let err = self.backend.unbounded_send(req);
774        match err {
775            Ok(_) => (),
776            Err(e) => {
777                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
778            }
779        }
780    }
781
782    /// Inserts or updates data for multiple block hashes
783    pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
784        let req = BackendRequest::UpdateBlockHash(block_hash_data);
785        let err = self.backend.unbounded_send(req);
786        match err {
787            Ok(_) => (),
788            Err(e) => {
789                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
790            }
791        }
792    }
793
794    /// Returns any arbitrary request on the provider
795    pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
796    where
797        F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
798        T: fmt::Debug + Send + 'static,
799    {
800        self.blocking_mode.run(|| {
801            let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
802            let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
803                sender,
804                future: Box::pin(fut),
805            }));
806            self.backend.unbounded_send(req)?;
807            rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
808        })
809    }
810
811    /// Flushes the DB to disk if caching is enabled
812    pub fn flush_cache(&self) {
813        self.cache.0.flush();
814    }
815
816    /// Flushes the DB to a specific file
817    pub fn flush_cache_to(&self, cache_path: &Path) {
818        self.cache.0.flush_to(cache_path);
819    }
820
821    /// Returns the DB
822    pub fn data(&self) -> Arc<MemDb> {
823        self.cache.0.db().clone()
824    }
825
826    /// Returns the DB accounts
827    pub fn accounts(&self) -> AddressData {
828        self.cache.0.db().accounts.read().clone()
829    }
830
831    /// Returns the DB accounts length
832    pub fn accounts_len(&self) -> usize {
833        self.cache.0.db().accounts.read().len()
834    }
835
836    /// Returns the DB storage
837    pub fn storage(&self) -> StorageData {
838        self.cache.0.db().storage.read().clone()
839    }
840
841    /// Returns the DB storage length
842    pub fn storage_len(&self) -> usize {
843        self.cache.0.db().storage.read().len()
844    }
845
846    /// Returns the DB block_hashes
847    pub fn block_hashes(&self) -> BlockHashData {
848        self.cache.0.db().block_hashes.read().clone()
849    }
850
851    /// Returns the DB block_hashes length
852    pub fn block_hashes_len(&self) -> usize {
853        self.cache.0.db().block_hashes.read().len()
854    }
855}
856
857impl DatabaseRef for SharedBackend {
858    type Error = DatabaseError;
859
860    fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
861        trace!(target: "sharedbackend", %address, "request basic");
862        self.do_get_basic(address).map_err(|err| {
863            error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
864            if err.is_possibly_non_archive_node_error() {
865                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
866            }
867            err
868        })
869    }
870
871    fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
872        Err(DatabaseError::MissingCode(hash))
873    }
874
875    fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
876        trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
877        self.do_get_storage(address, index).map_err(|err| {
878            error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
879            if err.is_possibly_non_archive_node_error() {
880                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
881            }
882          err
883        })
884    }
885
886    fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
887        trace!(target: "sharedbackend", "request block hash for number {:?}", number);
888        self.do_get_block_hash(number).map_err(|err| {
889            error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
890            if err.is_possibly_non_archive_node_error() {
891                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
892            }
893            err
894        })
895    }
896}
897
898#[cfg(test)]
899mod tests {
900    use super::*;
901    use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
902    use alloy_provider::ProviderBuilder;
903    use alloy_rpc_client::ClientBuilder;
904    use serde::Deserialize;
905    use std::{collections::BTreeSet, fs, path::PathBuf};
906    use tiny_http::{Response, Server};
907
908    pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
909        ProviderBuilder::new()
910            .network::<AnyNetwork>()
911            .connect_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
912    }
913
914    const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
915
916    #[tokio::test(flavor = "multi_thread")]
917    async fn test_builder() {
918        let Some(endpoint) = ENDPOINT else { return };
919        let provider = get_http_provider(endpoint);
920
921        let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
922        let _meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
923    }
924
925    #[tokio::test(flavor = "multi_thread")]
926    async fn shared_backend() {
927        let Some(endpoint) = ENDPOINT else { return };
928
929        let provider = get_http_provider(endpoint);
930        let meta = BlockchainDbMeta {
931            block_env: Default::default(),
932            hosts: BTreeSet::from([endpoint.to_string()]),
933        };
934
935        let db = BlockchainDb::new(meta, None);
936        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
937
938        // some rng contract from etherscan
939        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
940
941        let idx = U256::from(0u64);
942        let value = backend.storage_ref(address, idx).unwrap();
943        let account = backend.basic_ref(address).unwrap().unwrap();
944
945        let mem_acc = db.accounts().read().get(&address).unwrap().clone();
946        assert_eq!(account.balance, mem_acc.balance);
947        assert_eq!(account.nonce, mem_acc.nonce);
948        let slots = db.storage().read().get(&address).unwrap().clone();
949        assert_eq!(slots.len(), 1);
950        assert_eq!(slots.get(&idx).copied().unwrap(), value);
951
952        let num = 10u64;
953        let hash = backend.block_hash_ref(num).unwrap();
954        let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
955        assert_eq!(hash, mem_hash);
956
957        let max_slots = 5;
958        let handle = std::thread::spawn(move || {
959            for i in 1..max_slots {
960                let idx = U256::from(i);
961                let _ = backend.storage_ref(address, idx);
962            }
963        });
964        handle.join().unwrap();
965        let slots = db.storage().read().get(&address).unwrap().clone();
966        assert_eq!(slots.len() as u64, max_slots);
967    }
968
969    #[test]
970    fn can_read_cache() {
971        let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
972        let json = JsonBlockCacheDB::load(cache_path).unwrap();
973        assert!(!json.db().accounts.read().is_empty());
974    }
975
976    #[tokio::test(flavor = "multi_thread")]
977    async fn can_modify_address() {
978        let Some(endpoint) = ENDPOINT else { return };
979
980        let provider = get_http_provider(endpoint);
981        let meta = BlockchainDbMeta {
982            block_env: Default::default(),
983            hosts: BTreeSet::from([endpoint.to_string()]),
984        };
985
986        let db = BlockchainDb::new(meta, None);
987        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
988
989        // some rng contract from etherscan
990        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
991
992        let new_acc = AccountInfo {
993            nonce: 1000u64,
994            balance: U256::from(2000),
995            code: None,
996            code_hash: KECCAK_EMPTY,
997        };
998        let mut account_data = AddressData::default();
999        account_data.insert(address, new_acc.clone());
1000
1001        backend.insert_or_update_address(account_data);
1002
1003        let max_slots = 5;
1004        let handle = std::thread::spawn(move || {
1005            for i in 1..max_slots {
1006                let idx = U256::from(i);
1007                let result_address = backend.basic_ref(address).unwrap();
1008                match result_address {
1009                    Some(acc) => {
1010                        assert_eq!(
1011                            acc.nonce, new_acc.nonce,
1012                            "The nonce was not changed in instance of index {}",
1013                            idx
1014                        );
1015                        assert_eq!(
1016                            acc.balance, new_acc.balance,
1017                            "The balance was not changed in instance of index {}",
1018                            idx
1019                        );
1020
1021                        // comparing with db
1022                        let db_address = {
1023                            let accounts = db.accounts().read();
1024                            accounts.get(&address).unwrap().clone()
1025                        };
1026
1027                        assert_eq!(
1028                            db_address.nonce, new_acc.nonce,
1029                            "The nonce was not changed in instance of index {}",
1030                            idx
1031                        );
1032                        assert_eq!(
1033                            db_address.balance, new_acc.balance,
1034                            "The balance was not changed in instance of index {}",
1035                            idx
1036                        );
1037                    }
1038                    None => panic!("Account not found"),
1039                }
1040            }
1041        });
1042        handle.join().unwrap();
1043    }
1044
1045    #[tokio::test(flavor = "multi_thread")]
1046    async fn can_modify_storage() {
1047        let Some(endpoint) = ENDPOINT else { return };
1048
1049        let provider = get_http_provider(endpoint);
1050        let meta = BlockchainDbMeta {
1051            block_env: Default::default(),
1052            hosts: BTreeSet::from([endpoint.to_string()]),
1053        };
1054
1055        let db = BlockchainDb::new(meta, None);
1056        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1057
1058        // some rng contract from etherscan
1059        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1060
1061        let mut storage_data = StorageData::default();
1062        let mut storage_info = StorageInfo::default();
1063        storage_info.insert(U256::from(20), U256::from(10));
1064        storage_info.insert(U256::from(30), U256::from(15));
1065        storage_info.insert(U256::from(40), U256::from(20));
1066
1067        storage_data.insert(address, storage_info);
1068
1069        backend.insert_or_update_storage(storage_data.clone());
1070
1071        let max_slots = 5;
1072        let handle = std::thread::spawn(move || {
1073            for _ in 1..max_slots {
1074                for (address, info) in &storage_data {
1075                    for (index, value) in info {
1076                        let result_storage = backend.do_get_storage(*address, *index);
1077                        match result_storage {
1078                            Ok(stg_db) => {
1079                                assert_eq!(
1080                                    stg_db, *value,
1081                                    "Storage in slot number {} in address {} do not have the same value", index, address
1082                                );
1083
1084                                let db_result = {
1085                                    let storage = db.storage().read();
1086                                    let address_storage = storage.get(address).unwrap();
1087                                    *address_storage.get(index).unwrap()
1088                                };
1089
1090                                assert_eq!(
1091                                    stg_db, db_result,
1092                                    "Storage in slot number {} in address {} do not have the same value", index, address
1093                                )
1094                            }
1095
1096                            Err(err) => {
1097                                panic!("There was a database error: {}", err)
1098                            }
1099                        }
1100                    }
1101                }
1102            }
1103        });
1104        handle.join().unwrap();
1105    }
1106
1107    #[tokio::test(flavor = "multi_thread")]
1108    async fn can_modify_block_hashes() {
1109        let Some(endpoint) = ENDPOINT else { return };
1110
1111        let provider = get_http_provider(endpoint);
1112        let meta = BlockchainDbMeta {
1113            block_env: Default::default(),
1114            hosts: BTreeSet::from([endpoint.to_string()]),
1115        };
1116
1117        let db = BlockchainDb::new(meta, None);
1118        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1119
1120        // some rng contract from etherscan
1121        // let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1122
1123        let mut block_hash_data = BlockHashData::default();
1124        block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1125        block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1126        block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1127        block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1128        block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1129
1130        backend.insert_or_update_block_hashes(block_hash_data.clone());
1131
1132        let max_slots: u64 = 5;
1133        let handle = std::thread::spawn(move || {
1134            for i in 1..max_slots {
1135                let key = U256::from(i);
1136                let result_hash = backend.do_get_block_hash(i);
1137                match result_hash {
1138                    Ok(hash) => {
1139                        assert_eq!(
1140                            hash,
1141                            *block_hash_data.get(&key).unwrap(),
1142                            "The hash in block {} did not match",
1143                            key
1144                        );
1145
1146                        let db_result = {
1147                            let hashes = db.block_hashes().read();
1148                            *hashes.get(&key).unwrap()
1149                        };
1150
1151                        assert_eq!(hash, db_result, "The hash in block {} did not match", key);
1152                    }
1153                    Err(err) => panic!("Hash not found, error: {}", err),
1154                }
1155            }
1156        });
1157        handle.join().unwrap();
1158    }
1159
1160    #[tokio::test(flavor = "multi_thread")]
1161    async fn can_modify_storage_with_cache() {
1162        let Some(endpoint) = ENDPOINT else { return };
1163
1164        let provider = get_http_provider(endpoint);
1165        let meta = BlockchainDbMeta {
1166            block_env: Default::default(),
1167            hosts: BTreeSet::from([endpoint.to_string()]),
1168        };
1169
1170        // create a temporary file
1171        fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1172
1173        let cache_path =
1174            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1175
1176        let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1177        let backend =
1178            SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1179
1180        // some rng contract from etherscan
1181        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1182
1183        let mut storage_data = StorageData::default();
1184        let mut storage_info = StorageInfo::default();
1185        storage_info.insert(U256::from(1), U256::from(10));
1186        storage_info.insert(U256::from(2), U256::from(15));
1187        storage_info.insert(U256::from(3), U256::from(20));
1188        storage_info.insert(U256::from(4), U256::from(20));
1189        storage_info.insert(U256::from(5), U256::from(15));
1190        storage_info.insert(U256::from(6), U256::from(10));
1191
1192        let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1193        address_data.code = None;
1194
1195        storage_data.insert(address, storage_info);
1196
1197        backend.insert_or_update_storage(storage_data.clone());
1198
1199        let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1200        // nullify the code
1201        new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1202
1203        let mut account_data = AddressData::default();
1204        account_data.insert(address, new_acc.clone());
1205
1206        backend.insert_or_update_address(account_data);
1207
1208        let backend_clone = backend.clone();
1209
1210        let max_slots = 5;
1211        let handle = std::thread::spawn(move || {
1212            for _ in 1..max_slots {
1213                for (address, info) in &storage_data {
1214                    for (index, value) in info {
1215                        let result_storage = backend.do_get_storage(*address, *index);
1216                        match result_storage {
1217                            Ok(stg_db) => {
1218                                assert_eq!(
1219                                    stg_db, *value,
1220                                    "Storage in slot number {} in address {} doesn't have the same value", index, address
1221                                );
1222
1223                                let db_result = {
1224                                    let storage = db.storage().read();
1225                                    let address_storage = storage.get(address).unwrap();
1226                                    *address_storage.get(index).unwrap()
1227                                };
1228
1229                                assert_eq!(
1230                                    stg_db, db_result,
1231                                    "Storage in slot number {} in address {} doesn't have the same value", index, address
1232                                );
1233                            }
1234
1235                            Err(err) => {
1236                                panic!("There was a database error: {}", err)
1237                            }
1238                        }
1239                    }
1240                }
1241            }
1242
1243            backend_clone.flush_cache();
1244        });
1245        handle.join().unwrap();
1246
1247        // read json and confirm the changes to the data
1248
1249        let cache_path =
1250            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1251
1252        let json_db = BlockchainDb::new(meta, Some(cache_path));
1253
1254        let mut storage_data = StorageData::default();
1255        let mut storage_info = StorageInfo::default();
1256        storage_info.insert(U256::from(1), U256::from(10));
1257        storage_info.insert(U256::from(2), U256::from(15));
1258        storage_info.insert(U256::from(3), U256::from(20));
1259        storage_info.insert(U256::from(4), U256::from(20));
1260        storage_info.insert(U256::from(5), U256::from(15));
1261        storage_info.insert(U256::from(6), U256::from(10));
1262
1263        storage_data.insert(address, storage_info);
1264
1265        // redo the checks with the data extracted from the json file
1266        let max_slots = 5;
1267        let handle = std::thread::spawn(move || {
1268            for _ in 1..max_slots {
1269                for (address, info) in &storage_data {
1270                    for (index, value) in info {
1271                        let result_storage = {
1272                            let storage = json_db.storage().read();
1273                            let address_storage = storage.get(address).unwrap().clone();
1274                            *address_storage.get(index).unwrap()
1275                        };
1276
1277                        assert_eq!(
1278                            result_storage, *value,
1279                            "Storage in slot number {} in address {} doesn't have the same value",
1280                            index, address
1281                        );
1282                    }
1283                }
1284            }
1285        });
1286
1287        handle.join().unwrap();
1288
1289        // erase the temporary file
1290        fs::remove_file("test-data/storage-tmp.json").unwrap();
1291    }
1292
1293    #[tokio::test(flavor = "multi_thread")]
1294    async fn shared_backend_any_request() {
1295        let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1296        let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1297        let endpoint = format!("http://{}", server.server_addr());
1298
1299        // Spin an in-memory server that responds to "foo_callCustomMethod" rpc call.
1300        let expected_bytes_innner = expected_response_bytes.clone();
1301        let server_handle = std::thread::spawn(move || {
1302            #[derive(Debug, Deserialize)]
1303            struct Request {
1304                method: String,
1305            }
1306            let mut request = server.recv().unwrap();
1307            let rpc_request: Request =
1308                serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1309
1310            match rpc_request.method.as_str() {
1311                "foo_callCustomMethod" => request
1312                    .respond(Response::from_string(format!(
1313                        r#"{{"result": "{}"}}"#,
1314                        alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1315                    )))
1316                    .unwrap(),
1317                _ => request
1318                    .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1319                    .unwrap(),
1320            };
1321        });
1322
1323        let provider = get_http_provider(&endpoint);
1324        let meta = BlockchainDbMeta {
1325            block_env: Default::default(),
1326            hosts: BTreeSet::from([endpoint.to_string()]),
1327        };
1328
1329        let db = BlockchainDb::new(meta, None);
1330        let provider_inner = provider.clone();
1331        let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1332
1333        let actual_response_bytes = backend
1334            .do_any_request(async move {
1335                let bytes: alloy_primitives::Bytes =
1336                    provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1337                Ok(bytes)
1338            })
1339            .expect("failed performing any request");
1340
1341        assert_eq!(actual_response_bytes, expected_response_bytes);
1342
1343        server_handle.join().unwrap();
1344    }
1345}