foundry_fork_db/
backend.rs

1//! Smart caching and deduplication of requests when using a forking provider.
2
3use crate::{
4    cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5    error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9    network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction},
10    Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15    channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16    pin_mut,
17    stream::Stream,
18    task::{Context, Poll},
19    Future, FutureExt,
20};
21use revm::{
22    database::DatabaseRef,
23    primitives::{
24        map::{hash_map::Entry, AddressHashMap, HashMap},
25        KECCAK_EMPTY,
26    },
27    state::{AccountInfo, Bytecode},
28};
29use std::{
30    collections::VecDeque,
31    fmt,
32    future::IntoFuture,
33    path::Path,
34    pin::Pin,
35    sync::{
36        atomic::{AtomicU8, Ordering},
37        mpsc::{channel as oneshot_channel, Sender as OneshotSender},
38        Arc,
39    },
40};
41use tokio::select;
42
43/// Logged when an error is indicative that the user is trying to fork from a non-archive node.
44pub const NON_ARCHIVE_NODE_WARNING: &str = "\
45It looks like you're trying to fork from an older block with a non-archive node which is not \
46supported. Please try to change your RPC url to an archive node if the issue persists.";
47
48// Various future/request type aliases
49
50type AccountFuture<Err> =
51    Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
52type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
53type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
54type FullBlockFuture<Err> = Pin<
55    Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
56>;
57type TransactionFuture<Err> =
58    Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
59
60type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
61type StorageSender = OneshotSender<DatabaseResult<U256>>;
62type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
63type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
64type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
65
66type AddressData = AddressHashMap<AccountInfo>;
67type StorageData = AddressHashMap<StorageInfo>;
68type BlockHashData = HashMap<U256, B256>;
69
70/// States for tracking which account endpoints should be used when account info
71const ACCOUNT_FETCH_UNCHECKED: u8 = 0;
72/// Endpoints supports the non standard eth_getAccountInfo which is more efficient than sending 3
73/// separate requests
74const ACCOUNT_FETCH_SUPPORTS_ACC_INFO: u8 = 1;
75/// Use regular individual getCode, getNonce, getBalance calls
76const ACCOUNT_FETCH_SEPARATE_REQUESTS: u8 = 2;
77
78struct AnyRequestFuture<T, Err> {
79    sender: OneshotSender<Result<T, Err>>,
80    future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
81}
82
83impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
86    }
87}
88
89trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
90    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
91}
92
93/// @dev Implements `WrappedAnyRequest` for `AnyRequestFuture`.
94///
95/// - `poll_inner` is similar to `Future` polling but intentionally consumes the Future<Output=T>
96///   and return Future<Output=()>
97/// - This design avoids storing `Future<Output = T>` directly, as its type may not be known at
98///   compile time.
99/// - Instead, the result (`Result<T, Err>`) is sent via the `sender` channel, which enforces type
100///   safety.
101impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
102where
103    T: fmt::Debug + Send + 'static,
104    Err: fmt::Debug + Send + 'static,
105{
106    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
107        match self.future.poll_unpin(cx) {
108            Poll::Ready(result) => {
109                let _ = self.sender.send(result);
110                Poll::Ready(())
111            }
112            Poll::Pending => Poll::Pending,
113        }
114    }
115}
116
117/// Request variants that are executed by the provider
118enum ProviderRequest<Err> {
119    Account(AccountFuture<Err>),
120    Storage(StorageFuture<Err>),
121    BlockHash(BlockHashFuture<Err>),
122    FullBlock(FullBlockFuture<Err>),
123    Transaction(TransactionFuture<Err>),
124    AnyRequest(Box<dyn WrappedAnyRequest>),
125}
126
127/// The Request type the Backend listens for
128#[derive(Debug)]
129enum BackendRequest {
130    /// Fetch the account info
131    Basic(Address, AccountInfoSender),
132    /// Fetch a storage slot
133    Storage(Address, U256, StorageSender),
134    /// Fetch a block hash
135    BlockHash(u64, BlockHashSender),
136    /// Fetch an entire block with transactions
137    FullBlock(BlockId, FullBlockSender),
138    /// Fetch a transaction
139    Transaction(B256, TransactionSender),
140    /// Sets the pinned block to fetch data from
141    SetPinnedBlock(BlockId),
142
143    /// Update Address data
144    UpdateAddress(AddressData),
145    /// Update Storage data
146    UpdateStorage(StorageData),
147    /// Update Block Hashes
148    UpdateBlockHash(BlockHashData),
149    /// Any other request
150    AnyRequest(Box<dyn WrappedAnyRequest>),
151}
152
153/// Handles an internal provider and listens for requests.
154///
155/// This handler will remain active as long as it is reachable (request channel still open) and
156/// requests are in progress.
157#[must_use = "futures do nothing unless polled"]
158pub struct BackendHandler<P> {
159    provider: P,
160    /// Stores all the data.
161    db: BlockchainDb,
162    /// Requests currently in progress
163    pending_requests: Vec<ProviderRequest<eyre::Report>>,
164    /// Listeners that wait for a `get_account` related response
165    account_requests: HashMap<Address, Vec<AccountInfoSender>>,
166    /// Listeners that wait for a `get_storage_at` response
167    storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
168    /// Listeners that wait for a `get_block` response
169    block_requests: HashMap<u64, Vec<BlockHashSender>>,
170    /// Incoming commands.
171    incoming: UnboundedReceiver<BackendRequest>,
172    /// unprocessed queued requests
173    queued_requests: VecDeque<BackendRequest>,
174    /// The block to fetch data from.
175    // This is an `Option` so that we can have less code churn in the functions below
176    block_id: Option<BlockId>,
177    /// The mode for fetching account data
178    account_fetch_mode: Arc<AtomicU8>,
179}
180
181impl<P> BackendHandler<P>
182where
183    P: Provider<AnyNetwork> + Clone + Unpin + 'static,
184{
185    fn new(
186        provider: P,
187        db: BlockchainDb,
188        rx: UnboundedReceiver<BackendRequest>,
189        block_id: Option<BlockId>,
190    ) -> Self {
191        Self {
192            provider,
193            db,
194            pending_requests: Default::default(),
195            account_requests: Default::default(),
196            storage_requests: Default::default(),
197            block_requests: Default::default(),
198            queued_requests: Default::default(),
199            incoming: rx,
200            block_id,
201            account_fetch_mode: Arc::new(AtomicU8::new(ACCOUNT_FETCH_UNCHECKED)),
202        }
203    }
204
205    /// handle the request in queue in the future.
206    ///
207    /// We always check:
208    ///  1. if the requested value is already stored in the cache, then answer the sender
209    ///  2. otherwise, fetch it via the provider but check if a request for that value is already in
210    ///     progress (e.g. another Sender just requested the same account)
211    fn on_request(&mut self, req: BackendRequest) {
212        match req {
213            BackendRequest::Basic(addr, sender) => {
214                trace!(target: "backendhandler", "received request basic address={:?}", addr);
215                let acc = self.db.accounts().read().get(&addr).cloned();
216                if let Some(basic) = acc {
217                    let _ = sender.send(Ok(basic));
218                } else {
219                    self.request_account(addr, sender);
220                }
221            }
222            BackendRequest::BlockHash(number, sender) => {
223                let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
224                if let Some(hash) = hash {
225                    let _ = sender.send(Ok(hash));
226                } else {
227                    self.request_hash(number, sender);
228                }
229            }
230            BackendRequest::FullBlock(number, sender) => {
231                self.request_full_block(number, sender);
232            }
233            BackendRequest::Transaction(tx, sender) => {
234                self.request_transaction(tx, sender);
235            }
236            BackendRequest::Storage(addr, idx, sender) => {
237                // account is already stored in the cache
238                let value =
239                    self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
240                if let Some(value) = value {
241                    let _ = sender.send(Ok(value));
242                } else {
243                    // account present but not storage -> fetch storage
244                    self.request_account_storage(addr, idx, sender);
245                }
246            }
247            BackendRequest::SetPinnedBlock(block_id) => {
248                self.block_id = Some(block_id);
249            }
250            BackendRequest::UpdateAddress(address_data) => {
251                for (address, data) in address_data {
252                    self.db.accounts().write().insert(address, data);
253                }
254            }
255            BackendRequest::UpdateStorage(storage_data) => {
256                for (address, data) in storage_data {
257                    self.db.storage().write().insert(address, data);
258                }
259            }
260            BackendRequest::UpdateBlockHash(block_hash_data) => {
261                for (block, hash) in block_hash_data {
262                    self.db.block_hashes().write().insert(block, hash);
263                }
264            }
265            BackendRequest::AnyRequest(fut) => {
266                self.pending_requests.push(ProviderRequest::AnyRequest(fut));
267            }
268        }
269    }
270
271    /// process a request for account's storage
272    fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
273        match self.storage_requests.entry((address, idx)) {
274            Entry::Occupied(mut entry) => {
275                entry.get_mut().push(listener);
276            }
277            Entry::Vacant(entry) => {
278                trace!(target: "backendhandler", %address, %idx, "preparing storage request");
279                entry.insert(vec![listener]);
280                let provider = self.provider.clone();
281                let block_id = self.block_id.unwrap_or_default();
282                let fut = Box::pin(async move {
283                    let storage = provider
284                        .get_storage_at(address, idx)
285                        .block_id(block_id)
286                        .await
287                        .map_err(Into::into);
288                    (storage, address, idx)
289                });
290                self.pending_requests.push(ProviderRequest::Storage(fut));
291            }
292        }
293    }
294
295    /// returns the future that fetches the account data
296    fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
297        trace!(target: "backendhandler", "preparing account request, address={:?}", address);
298
299        let provider = self.provider.clone();
300        let block_id = self.block_id.unwrap_or_default();
301        let mode = Arc::clone(&self.account_fetch_mode);
302        let fut = async move {
303            // depending on the tracked mode we can dispatch requests.
304            let initial_mode = mode.load(Ordering::Relaxed);
305            match initial_mode {
306                ACCOUNT_FETCH_UNCHECKED => {
307                    // single request for accountinfo object
308                    let acc_info_fut =
309                        provider.get_account_info(address).block_id(block_id).into_future();
310
311                    // tri request for account info
312                    let balance_fut =
313                        provider.get_balance(address).block_id(block_id).into_future();
314                    let nonce_fut =
315                        provider.get_transaction_count(address).block_id(block_id).into_future();
316                    let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
317                    let triple_fut = futures::future::try_join3(balance_fut, nonce_fut, code_fut);
318                    pin_mut!(acc_info_fut, triple_fut);
319
320                    select! {
321                        acc_info = &mut acc_info_fut => {
322                            match acc_info {
323                                Ok(info) => {
324                                 trace!(target: "backendhandler", "endpoint supports eth_getAccountInfo");
325                                    mode.store(ACCOUNT_FETCH_SUPPORTS_ACC_INFO, Ordering::Relaxed);
326                                    Ok((info.balance, info.nonce, info.code))
327                                }
328                                Err(err) => {
329                                    trace!(target: "backendhandler", ?err, "failed initial eth_getAccountInfo call");
330                                    mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
331                                    Ok(triple_fut.await?)
332                                }
333                            }
334                        }
335                        triple = &mut triple_fut => {
336                            match triple {
337                                Ok((balance, nonce, code)) => {
338                                    mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
339                                    Ok((balance, nonce, code))
340                                }
341                                Err(err) => Err(err.into())
342                            }
343                        }
344                    }
345                }
346
347                ACCOUNT_FETCH_SUPPORTS_ACC_INFO => {
348                    let mut res = provider
349                        .get_account_info(address)
350                        .block_id(block_id)
351                        .into_future()
352                        .await
353                        .map(|info| (info.balance, info.nonce, info.code));
354
355                    // it's possible that the configured endpoint load balances requests to multiple
356                    // instances and not all support that endpoint so we should reset here
357                    if res.is_err() {
358                        mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
359
360                        let balance_fut =
361                            provider.get_balance(address).block_id(block_id).into_future();
362                        let nonce_fut = provider
363                            .get_transaction_count(address)
364                            .block_id(block_id)
365                            .into_future();
366                        let code_fut =
367                            provider.get_code_at(address).block_id(block_id).into_future();
368                        res = futures::future::try_join3(balance_fut, nonce_fut, code_fut).await;
369                    }
370
371                    Ok(res?)
372                }
373
374                ACCOUNT_FETCH_SEPARATE_REQUESTS => {
375                    let balance_fut =
376                        provider.get_balance(address).block_id(block_id).into_future();
377                    let nonce_fut =
378                        provider.get_transaction_count(address).block_id(block_id).into_future();
379                    let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
380
381                    Ok(futures::future::try_join3(balance_fut, nonce_fut, code_fut).await?)
382                }
383
384                _ => unreachable!("Invalid account fetch mode"),
385            }
386        };
387
388        ProviderRequest::Account(Box::pin(async move {
389            let result = fut.await;
390            (result, address)
391        }))
392    }
393
394    /// process a request for an account
395    fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
396        match self.account_requests.entry(address) {
397            Entry::Occupied(mut entry) => {
398                entry.get_mut().push(listener);
399            }
400            Entry::Vacant(entry) => {
401                entry.insert(vec![listener]);
402                self.pending_requests.push(self.get_account_req(address));
403            }
404        }
405    }
406
407    /// process a request for an entire block
408    fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
409        let provider = self.provider.clone();
410        let fut = Box::pin(async move {
411            let block = provider
412                .get_block(number)
413                .full()
414                .await
415                .wrap_err(format!("could not fetch block {number:?}"));
416            (sender, block, number)
417        });
418
419        self.pending_requests.push(ProviderRequest::FullBlock(fut));
420    }
421
422    /// process a request for a transactions
423    fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
424        let provider = self.provider.clone();
425        let fut = Box::pin(async move {
426            let block = provider
427                .get_transaction_by_hash(tx)
428                .await
429                .wrap_err_with(|| format!("could not get transaction {tx}"))
430                .and_then(|maybe| {
431                    maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
432                });
433            (sender, block, tx)
434        });
435
436        self.pending_requests.push(ProviderRequest::Transaction(fut));
437    }
438
439    /// process a request for a block hash
440    fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
441        match self.block_requests.entry(number) {
442            Entry::Occupied(mut entry) => {
443                entry.get_mut().push(listener);
444            }
445            Entry::Vacant(entry) => {
446                trace!(target: "backendhandler", number, "preparing block hash request");
447                entry.insert(vec![listener]);
448                let provider = self.provider.clone();
449                let fut = Box::pin(async move {
450                    let block = provider
451                        .get_block_by_number(number.into())
452                        .hashes()
453                        .await
454                        .wrap_err("failed to get block");
455
456                    let block_hash = match block {
457                        Ok(Some(block)) => Ok(block.header.hash),
458                        Ok(None) => {
459                            warn!(target: "backendhandler", ?number, "block not found");
460                            // if no block was returned then the block does not exist, in which case
461                            // we return empty hash
462                            Ok(KECCAK_EMPTY)
463                        }
464                        Err(err) => {
465                            error!(target: "backendhandler", %err, ?number, "failed to get block");
466                            Err(err)
467                        }
468                    };
469                    (block_hash, number)
470                });
471                self.pending_requests.push(ProviderRequest::BlockHash(fut));
472            }
473        }
474    }
475}
476
477impl<P> Future for BackendHandler<P>
478where
479    P: Provider<AnyNetwork> + Clone + Unpin + 'static,
480{
481    type Output = ();
482
483    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
484        let pin = self.get_mut();
485        loop {
486            // Drain queued requests first.
487            while let Some(req) = pin.queued_requests.pop_front() {
488                pin.on_request(req)
489            }
490
491            // receive new requests to delegate to the underlying provider
492            loop {
493                match Pin::new(&mut pin.incoming).poll_next(cx) {
494                    Poll::Ready(Some(req)) => {
495                        pin.queued_requests.push_back(req);
496                    }
497                    Poll::Ready(None) => {
498                        trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
499                        return Poll::Ready(());
500                    }
501                    Poll::Pending => break,
502                }
503            }
504
505            // poll all requests in progress
506            for n in (0..pin.pending_requests.len()).rev() {
507                let mut request = pin.pending_requests.swap_remove(n);
508                match &mut request {
509                    ProviderRequest::Account(fut) => {
510                        if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
511                            // get the response
512                            let (balance, nonce, code) = match resp {
513                                Ok(res) => res,
514                                Err(err) => {
515                                    let err = Arc::new(err);
516                                    if let Some(listeners) = pin.account_requests.remove(&addr) {
517                                        listeners.into_iter().for_each(|l| {
518                                            let _ = l.send(Err(DatabaseError::GetAccount(
519                                                addr,
520                                                Arc::clone(&err),
521                                            )));
522                                        })
523                                    }
524                                    continue;
525                                }
526                            };
527
528                            // convert it to revm-style types
529                            let (code, code_hash) = if !code.is_empty() {
530                                (code.clone(), keccak256(&code))
531                            } else {
532                                (Bytes::default(), KECCAK_EMPTY)
533                            };
534
535                            // update the cache
536                            let acc = AccountInfo {
537                                nonce,
538                                balance,
539                                code: Some(Bytecode::new_raw(code)),
540                                code_hash,
541                            };
542                            pin.db.accounts().write().insert(addr, acc.clone());
543
544                            // notify all listeners
545                            if let Some(listeners) = pin.account_requests.remove(&addr) {
546                                listeners.into_iter().for_each(|l| {
547                                    let _ = l.send(Ok(acc.clone()));
548                                })
549                            }
550                            continue;
551                        }
552                    }
553                    ProviderRequest::Storage(fut) => {
554                        if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
555                            let value = match resp {
556                                Ok(value) => value,
557                                Err(err) => {
558                                    // notify all listeners
559                                    let err = Arc::new(err);
560                                    if let Some(listeners) =
561                                        pin.storage_requests.remove(&(addr, idx))
562                                    {
563                                        listeners.into_iter().for_each(|l| {
564                                            let _ = l.send(Err(DatabaseError::GetStorage(
565                                                addr,
566                                                idx,
567                                                Arc::clone(&err),
568                                            )));
569                                        })
570                                    }
571                                    continue;
572                                }
573                            };
574
575                            // update the cache
576                            pin.db.storage().write().entry(addr).or_default().insert(idx, value);
577
578                            // notify all listeners
579                            if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
580                                listeners.into_iter().for_each(|l| {
581                                    let _ = l.send(Ok(value));
582                                })
583                            }
584                            continue;
585                        }
586                    }
587                    ProviderRequest::BlockHash(fut) => {
588                        if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
589                            let value = match block_hash {
590                                Ok(value) => value,
591                                Err(err) => {
592                                    let err = Arc::new(err);
593                                    // notify all listeners
594                                    if let Some(listeners) = pin.block_requests.remove(&number) {
595                                        listeners.into_iter().for_each(|l| {
596                                            let _ = l.send(Err(DatabaseError::GetBlockHash(
597                                                number,
598                                                Arc::clone(&err),
599                                            )));
600                                        })
601                                    }
602                                    continue;
603                                }
604                            };
605
606                            // update the cache
607                            pin.db.block_hashes().write().insert(U256::from(number), value);
608
609                            // notify all listeners
610                            if let Some(listeners) = pin.block_requests.remove(&number) {
611                                listeners.into_iter().for_each(|l| {
612                                    let _ = l.send(Ok(value));
613                                })
614                            }
615                            continue;
616                        }
617                    }
618                    ProviderRequest::FullBlock(fut) => {
619                        if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
620                            let msg = match resp {
621                                Ok(Some(block)) => Ok(block),
622                                Ok(None) => Err(DatabaseError::BlockNotFound(number)),
623                                Err(err) => {
624                                    let err = Arc::new(err);
625                                    Err(DatabaseError::GetFullBlock(number, err))
626                                }
627                            };
628                            let _ = sender.send(msg);
629                            continue;
630                        }
631                    }
632                    ProviderRequest::Transaction(fut) => {
633                        if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
634                            let msg = match tx {
635                                Ok(tx) => Ok(tx),
636                                Err(err) => {
637                                    let err = Arc::new(err);
638                                    Err(DatabaseError::GetTransaction(tx_hash, err))
639                                }
640                            };
641                            let _ = sender.send(msg);
642                            continue;
643                        }
644                    }
645                    ProviderRequest::AnyRequest(fut) => {
646                        if fut.poll_inner(cx).is_ready() {
647                            continue;
648                        }
649                    }
650                }
651                // not ready, insert and poll again
652                pin.pending_requests.push(request);
653            }
654
655            // If no new requests have been queued, break to
656            // be polled again later.
657            if pin.queued_requests.is_empty() {
658                return Poll::Pending;
659            }
660        }
661    }
662}
663
664/// Mode for the `SharedBackend` how to block in the non-async [`DatabaseRef`] when interacting with
665/// [`BackendHandler`].
666#[derive(Default, Clone, Debug, PartialEq)]
667pub enum BlockingMode {
668    /// This mode use `tokio::task::block_in_place()` to block in place.
669    ///
670    /// This should be used when blocking on the call site is disallowed.
671    #[default]
672    BlockInPlace,
673    /// The mode blocks the current task
674    ///
675    /// This can be used if blocking on the call site is allowed, e.g. on a tokio blocking task.
676    Block,
677}
678
679impl BlockingMode {
680    /// run process logic with the blocking mode
681    pub fn run<F, R>(&self, f: F) -> R
682    where
683        F: FnOnce() -> R,
684    {
685        match self {
686            Self::BlockInPlace => tokio::task::block_in_place(f),
687            Self::Block => f(),
688        }
689    }
690}
691
692/// A cloneable backend type that shares access to the backend data with all its clones.
693///
694/// This backend type is connected to the `BackendHandler` via a mpsc unbounded channel. The
695/// `BackendHandler` is spawned on a tokio task and listens for incoming commands on the receiver
696/// half of the channel. A `SharedBackend` holds a sender for that channel, which is `Clone`, so
697/// there can be multiple `SharedBackend`s communicating with the same `BackendHandler`, hence this
698/// `Backend` type is thread safe.
699///
700/// All `Backend` trait functions are delegated as a `BackendRequest` via the channel to the
701/// `BackendHandler`. All `BackendRequest` variants include a sender half of an additional channel
702/// that is used by the `BackendHandler` to send the result of an executed `BackendRequest` back to
703/// `SharedBackend`.
704///
705/// The `BackendHandler` holds a `Provider` to look up missing accounts or storage slots
706/// from remote (e.g. infura). It detects duplicate requests from multiple `SharedBackend`s and
707/// bundles them together, so that always only one provider request is executed. For example, there
708/// are two `SharedBackend`s, `A` and `B`, both request the basic account info of account
709/// `0xasd9sa7d...` at the same time. After the `BackendHandler` receives the request from `A`, it
710/// sends a new provider request to the provider's endpoint, then it reads the identical request
711/// from `B` and simply adds it as an additional listener for the request already in progress,
712/// instead of sending another one. So that after the provider returns the response all listeners
713/// (`A` and `B`) get notified.
714// **Note**: the implementation makes use of [tokio::task::block_in_place()] when interacting with
715// the underlying [BackendHandler] which runs on a separate spawned tokio task.
716// [tokio::task::block_in_place()]
717// > Runs the provided blocking function on the current thread without blocking the executor.
718// This prevents issues (hangs) we ran into were the [SharedBackend] itself is called from a spawned
719// task.
720#[derive(Clone, Debug)]
721pub struct SharedBackend {
722    /// channel used for sending commands related to database operations
723    backend: UnboundedSender<BackendRequest>,
724    /// Ensures that the underlying cache gets flushed once the last `SharedBackend` is dropped.
725    ///
726    /// There is only one instance of the type, so as soon as the last `SharedBackend` is deleted,
727    /// `FlushJsonBlockCacheDB` is also deleted and the cache is flushed.
728    cache: Arc<FlushJsonBlockCacheDB>,
729
730    /// The mode for the `SharedBackend` to block in place or not
731    blocking_mode: BlockingMode,
732}
733
734impl SharedBackend {
735    /// _Spawns_ a new `BackendHandler` on a `tokio::task` that listens for requests from any
736    /// `SharedBackend`. Missing values get inserted in the `db`.
737    ///
738    /// The spawned `BackendHandler` finishes once the last `SharedBackend` connected to it is
739    /// dropped.
740    ///
741    /// NOTE: this should be called with `Arc<Provider>`
742    pub async fn spawn_backend<P>(provider: P, db: BlockchainDb, pin_block: Option<BlockId>) -> Self
743    where
744        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
745    {
746        let (shared, handler) = Self::new(provider, db, pin_block);
747        // spawn the provider handler to a task
748        trace!(target: "backendhandler", "spawning Backendhandler task");
749        tokio::spawn(handler);
750        shared
751    }
752
753    /// Same as `Self::spawn_backend` but spawns the `BackendHandler` on a separate `std::thread` in
754    /// its own `tokio::Runtime`
755    pub fn spawn_backend_thread<P>(
756        provider: P,
757        db: BlockchainDb,
758        pin_block: Option<BlockId>,
759    ) -> Self
760    where
761        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
762    {
763        let (shared, handler) = Self::new(provider, db, pin_block);
764
765        // spawn a light-weight thread with a thread-local async runtime just for
766        // sending and receiving data from the remote client
767        std::thread::Builder::new()
768            .name("fork-backend".into())
769            .spawn(move || {
770                let rt = tokio::runtime::Builder::new_current_thread()
771                    .enable_all()
772                    .build()
773                    .expect("failed to build tokio runtime");
774
775                rt.block_on(handler);
776            })
777            .expect("failed to spawn thread");
778        trace!(target: "backendhandler", "spawned Backendhandler thread");
779
780        shared
781    }
782
783    /// Returns a new `SharedBackend` and the `BackendHandler`
784    pub fn new<P>(
785        provider: P,
786        db: BlockchainDb,
787        pin_block: Option<BlockId>,
788    ) -> (Self, BackendHandler<P>)
789    where
790        P: Provider<AnyNetwork> + Unpin + 'static + Clone,
791    {
792        let (backend, backend_rx) = unbounded();
793        let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
794        let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
795        (Self { backend, cache, blocking_mode: Default::default() }, handler)
796    }
797
798    /// Returns a new `SharedBackend` and the `BackendHandler` with a specific blocking mode
799    pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
800        Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
801    }
802
803    /// Updates the pinned block to fetch data from
804    pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
805        let req = BackendRequest::SetPinnedBlock(block.into());
806        self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
807    }
808
809    /// Returns the full block for the given block identifier
810    pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
811        self.blocking_mode.run(|| {
812            let (sender, rx) = oneshot_channel();
813            let req = BackendRequest::FullBlock(block.into(), sender);
814            self.backend.unbounded_send(req)?;
815            rx.recv()?
816        })
817    }
818
819    /// Returns the transaction for the hash
820    pub fn get_transaction(&self, tx: B256) -> DatabaseResult<AnyRpcTransaction> {
821        self.blocking_mode.run(|| {
822            let (sender, rx) = oneshot_channel();
823            let req = BackendRequest::Transaction(tx, sender);
824            self.backend.unbounded_send(req)?;
825            rx.recv()?
826        })
827    }
828
829    fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
830        self.blocking_mode.run(|| {
831            let (sender, rx) = oneshot_channel();
832            let req = BackendRequest::Basic(address, sender);
833            self.backend.unbounded_send(req)?;
834            rx.recv()?.map(Some)
835        })
836    }
837
838    fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
839        self.blocking_mode.run(|| {
840            let (sender, rx) = oneshot_channel();
841            let req = BackendRequest::Storage(address, index, sender);
842            self.backend.unbounded_send(req)?;
843            rx.recv()?
844        })
845    }
846
847    fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
848        self.blocking_mode.run(|| {
849            let (sender, rx) = oneshot_channel();
850            let req = BackendRequest::BlockHash(number, sender);
851            self.backend.unbounded_send(req)?;
852            rx.recv()?
853        })
854    }
855
856    /// Inserts or updates data for multiple addresses
857    pub fn insert_or_update_address(&self, address_data: AddressData) {
858        let req = BackendRequest::UpdateAddress(address_data);
859        let err = self.backend.unbounded_send(req);
860        match err {
861            Ok(_) => (),
862            Err(e) => {
863                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
864            }
865        }
866    }
867
868    /// Inserts or updates data for multiple storage slots
869    pub fn insert_or_update_storage(&self, storage_data: StorageData) {
870        let req = BackendRequest::UpdateStorage(storage_data);
871        let err = self.backend.unbounded_send(req);
872        match err {
873            Ok(_) => (),
874            Err(e) => {
875                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
876            }
877        }
878    }
879
880    /// Inserts or updates data for multiple block hashes
881    pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
882        let req = BackendRequest::UpdateBlockHash(block_hash_data);
883        let err = self.backend.unbounded_send(req);
884        match err {
885            Ok(_) => (),
886            Err(e) => {
887                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
888            }
889        }
890    }
891
892    /// Returns any arbitrary request on the provider
893    pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
894    where
895        F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
896        T: fmt::Debug + Send + 'static,
897    {
898        self.blocking_mode.run(|| {
899            let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
900            let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
901                sender,
902                future: Box::pin(fut),
903            }));
904            self.backend.unbounded_send(req)?;
905            rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
906        })
907    }
908
909    /// Flushes the DB to disk if caching is enabled
910    pub fn flush_cache(&self) {
911        self.cache.0.flush();
912    }
913
914    /// Flushes the DB to a specific file
915    pub fn flush_cache_to(&self, cache_path: &Path) {
916        self.cache.0.flush_to(cache_path);
917    }
918
919    /// Returns the DB
920    pub fn data(&self) -> Arc<MemDb> {
921        self.cache.0.db().clone()
922    }
923
924    /// Returns the DB accounts
925    pub fn accounts(&self) -> AddressData {
926        self.cache.0.db().accounts.read().clone()
927    }
928
929    /// Returns the DB accounts length
930    pub fn accounts_len(&self) -> usize {
931        self.cache.0.db().accounts.read().len()
932    }
933
934    /// Returns the DB storage
935    pub fn storage(&self) -> StorageData {
936        self.cache.0.db().storage.read().clone()
937    }
938
939    /// Returns the DB storage length
940    pub fn storage_len(&self) -> usize {
941        self.cache.0.db().storage.read().len()
942    }
943
944    /// Returns the DB block_hashes
945    pub fn block_hashes(&self) -> BlockHashData {
946        self.cache.0.db().block_hashes.read().clone()
947    }
948
949    /// Returns the DB block_hashes length
950    pub fn block_hashes_len(&self) -> usize {
951        self.cache.0.db().block_hashes.read().len()
952    }
953}
954
955impl DatabaseRef for SharedBackend {
956    type Error = DatabaseError;
957
958    fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
959        trace!(target: "sharedbackend", %address, "request basic");
960        self.do_get_basic(address).map_err(|err| {
961            error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
962            if err.is_possibly_non_archive_node_error() {
963                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
964            }
965            err
966        })
967    }
968
969    fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
970        Err(DatabaseError::MissingCode(hash))
971    }
972
973    fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
974        trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
975        self.do_get_storage(address, index).map_err(|err| {
976            error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
977            if err.is_possibly_non_archive_node_error() {
978                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
979            }
980          err
981        })
982    }
983
984    fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
985        trace!(target: "sharedbackend", "request block hash for number {:?}", number);
986        self.do_get_block_hash(number).map_err(|err| {
987            error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
988            if err.is_possibly_non_archive_node_error() {
989                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
990            }
991            err
992        })
993    }
994}
995
996#[cfg(test)]
997mod tests {
998    use super::*;
999    use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
1000    use alloy_consensus::BlockHeader;
1001    use alloy_provider::ProviderBuilder;
1002    use alloy_rpc_client::ClientBuilder;
1003    use serde::Deserialize;
1004    use std::{fs, path::PathBuf};
1005    use tiny_http::{Response, Server};
1006
1007    pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
1008        ProviderBuilder::new()
1009            .network::<AnyNetwork>()
1010            .connect_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
1011    }
1012
1013    const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
1014
1015    #[tokio::test(flavor = "multi_thread")]
1016    async fn test_builder() {
1017        let Some(endpoint) = ENDPOINT else { return };
1018        let provider = get_http_provider(endpoint);
1019
1020        let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
1021        let meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
1022
1023        assert_eq!(meta.block_env.number, U256::from(any_rpc_block.header.number()));
1024    }
1025
1026    #[tokio::test(flavor = "multi_thread")]
1027    async fn shared_backend() {
1028        let Some(endpoint) = ENDPOINT else { return };
1029
1030        let provider = get_http_provider(endpoint);
1031        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1032
1033        let db = BlockchainDb::new(meta, None);
1034        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1035
1036        // some rng contract from etherscan
1037        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1038
1039        let idx = U256::from(0u64);
1040        let value = backend.storage_ref(address, idx).unwrap();
1041        let account = backend.basic_ref(address).unwrap().unwrap();
1042
1043        let mem_acc = db.accounts().read().get(&address).unwrap().clone();
1044        assert_eq!(account.balance, mem_acc.balance);
1045        assert_eq!(account.nonce, mem_acc.nonce);
1046        let slots = db.storage().read().get(&address).unwrap().clone();
1047        assert_eq!(slots.len(), 1);
1048        assert_eq!(slots.get(&idx).copied().unwrap(), value);
1049
1050        let num = 10u64;
1051        let hash = backend.block_hash_ref(num).unwrap();
1052        let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
1053        assert_eq!(hash, mem_hash);
1054
1055        let max_slots = 5;
1056        let handle = std::thread::spawn(move || {
1057            for i in 1..max_slots {
1058                let idx = U256::from(i);
1059                let _ = backend.storage_ref(address, idx);
1060            }
1061        });
1062        handle.join().unwrap();
1063        let slots = db.storage().read().get(&address).unwrap().clone();
1064        assert_eq!(slots.len() as u64, max_slots);
1065    }
1066
1067    #[test]
1068    fn can_read_cache() {
1069        let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
1070        let json = JsonBlockCacheDB::load(cache_path).unwrap();
1071        assert!(!json.db().accounts.read().is_empty());
1072    }
1073
1074    #[tokio::test(flavor = "multi_thread")]
1075    async fn can_modify_address() {
1076        let Some(endpoint) = ENDPOINT else { return };
1077
1078        let provider = get_http_provider(endpoint);
1079        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1080
1081        let db = BlockchainDb::new(meta, None);
1082        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1083
1084        // some rng contract from etherscan
1085        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1086
1087        let new_acc = AccountInfo {
1088            nonce: 1000u64,
1089            balance: U256::from(2000),
1090            code: None,
1091            code_hash: KECCAK_EMPTY,
1092        };
1093        let mut account_data = AddressData::default();
1094        account_data.insert(address, new_acc.clone());
1095
1096        backend.insert_or_update_address(account_data);
1097
1098        let max_slots = 5;
1099        let handle = std::thread::spawn(move || {
1100            for i in 1..max_slots {
1101                let idx = U256::from(i);
1102                let result_address = backend.basic_ref(address).unwrap();
1103                match result_address {
1104                    Some(acc) => {
1105                        assert_eq!(
1106                            acc.nonce, new_acc.nonce,
1107                            "The nonce was not changed in instance of index {idx}"
1108                        );
1109                        assert_eq!(
1110                            acc.balance, new_acc.balance,
1111                            "The balance was not changed in instance of index {idx}"
1112                        );
1113
1114                        // comparing with db
1115                        let db_address = {
1116                            let accounts = db.accounts().read();
1117                            accounts.get(&address).unwrap().clone()
1118                        };
1119
1120                        assert_eq!(
1121                            db_address.nonce, new_acc.nonce,
1122                            "The nonce was not changed in instance of index {idx}"
1123                        );
1124                        assert_eq!(
1125                            db_address.balance, new_acc.balance,
1126                            "The balance was not changed in instance of index {idx}"
1127                        );
1128                    }
1129                    None => panic!("Account not found"),
1130                }
1131            }
1132        });
1133        handle.join().unwrap();
1134    }
1135
1136    #[tokio::test(flavor = "multi_thread")]
1137    async fn can_modify_storage() {
1138        let Some(endpoint) = ENDPOINT else { return };
1139
1140        let provider = get_http_provider(endpoint);
1141        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1142
1143        let db = BlockchainDb::new(meta, None);
1144        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1145
1146        // some rng contract from etherscan
1147        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1148
1149        let mut storage_data = StorageData::default();
1150        let mut storage_info = StorageInfo::default();
1151        storage_info.insert(U256::from(20), U256::from(10));
1152        storage_info.insert(U256::from(30), U256::from(15));
1153        storage_info.insert(U256::from(40), U256::from(20));
1154
1155        storage_data.insert(address, storage_info);
1156
1157        backend.insert_or_update_storage(storage_data.clone());
1158
1159        let max_slots = 5;
1160        let handle = std::thread::spawn(move || {
1161            for _ in 1..max_slots {
1162                for (address, info) in &storage_data {
1163                    for (index, value) in info {
1164                        let result_storage = backend.do_get_storage(*address, *index);
1165                        match result_storage {
1166                            Ok(stg_db) => {
1167                                assert_eq!(
1168                                    stg_db, *value,
1169                                    "Storage in slot number {index} in address {address} do not have the same value"
1170                                );
1171
1172                                let db_result = {
1173                                    let storage = db.storage().read();
1174                                    let address_storage = storage.get(address).unwrap();
1175                                    *address_storage.get(index).unwrap()
1176                                };
1177
1178                                assert_eq!(
1179                                    stg_db, db_result,
1180                                    "Storage in slot number {index} in address {address} do not have the same value"
1181                                )
1182                            }
1183
1184                            Err(err) => {
1185                                panic!("There was a database error: {err}")
1186                            }
1187                        }
1188                    }
1189                }
1190            }
1191        });
1192        handle.join().unwrap();
1193    }
1194
1195    #[tokio::test(flavor = "multi_thread")]
1196    async fn can_modify_block_hashes() {
1197        let Some(endpoint) = ENDPOINT else { return };
1198
1199        let provider = get_http_provider(endpoint);
1200        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1201
1202        let db = BlockchainDb::new(meta, None);
1203        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1204
1205        // some rng contract from etherscan
1206        // let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1207
1208        let mut block_hash_data = BlockHashData::default();
1209        block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1210        block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1211        block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1212        block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1213        block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1214
1215        backend.insert_or_update_block_hashes(block_hash_data.clone());
1216
1217        let max_slots: u64 = 5;
1218        let handle = std::thread::spawn(move || {
1219            for i in 1..max_slots {
1220                let key = U256::from(i);
1221                let result_hash = backend.do_get_block_hash(i);
1222                match result_hash {
1223                    Ok(hash) => {
1224                        assert_eq!(
1225                            hash,
1226                            *block_hash_data.get(&key).unwrap(),
1227                            "The hash in block {key} did not match"
1228                        );
1229
1230                        let db_result = {
1231                            let hashes = db.block_hashes().read();
1232                            *hashes.get(&key).unwrap()
1233                        };
1234
1235                        assert_eq!(hash, db_result, "The hash in block {key} did not match");
1236                    }
1237                    Err(err) => panic!("Hash not found, error: {err}"),
1238                }
1239            }
1240        });
1241        handle.join().unwrap();
1242    }
1243
1244    #[tokio::test(flavor = "multi_thread")]
1245    async fn can_modify_storage_with_cache() {
1246        let Some(endpoint) = ENDPOINT else { return };
1247
1248        let provider = get_http_provider(endpoint);
1249        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1250
1251        // create a temporary file
1252        fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1253
1254        let cache_path =
1255            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1256
1257        let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1258        let backend =
1259            SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1260
1261        // some rng contract from etherscan
1262        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1263
1264        let mut storage_data = StorageData::default();
1265        let mut storage_info = StorageInfo::default();
1266        storage_info.insert(U256::from(1), U256::from(10));
1267        storage_info.insert(U256::from(2), U256::from(15));
1268        storage_info.insert(U256::from(3), U256::from(20));
1269        storage_info.insert(U256::from(4), U256::from(20));
1270        storage_info.insert(U256::from(5), U256::from(15));
1271        storage_info.insert(U256::from(6), U256::from(10));
1272
1273        let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1274        address_data.code = None;
1275
1276        storage_data.insert(address, storage_info);
1277
1278        backend.insert_or_update_storage(storage_data.clone());
1279
1280        let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1281        // nullify the code
1282        new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1283
1284        let mut account_data = AddressData::default();
1285        account_data.insert(address, new_acc.clone());
1286
1287        backend.insert_or_update_address(account_data);
1288
1289        let backend_clone = backend.clone();
1290
1291        let max_slots = 5;
1292        let handle = std::thread::spawn(move || {
1293            for _ in 1..max_slots {
1294                for (address, info) in &storage_data {
1295                    for (index, value) in info {
1296                        let result_storage = backend.do_get_storage(*address, *index);
1297                        match result_storage {
1298                            Ok(stg_db) => {
1299                                assert_eq!(
1300                                    stg_db, *value,
1301                                    "Storage in slot number {index} in address {address} doesn't have the same value"
1302                                );
1303
1304                                let db_result = {
1305                                    let storage = db.storage().read();
1306                                    let address_storage = storage.get(address).unwrap();
1307                                    *address_storage.get(index).unwrap()
1308                                };
1309
1310                                assert_eq!(
1311                                    stg_db, db_result,
1312                                    "Storage in slot number {index} in address {address} doesn't have the same value"
1313                                );
1314                            }
1315
1316                            Err(err) => {
1317                                panic!("There was a database error: {err}")
1318                            }
1319                        }
1320                    }
1321                }
1322            }
1323
1324            backend_clone.flush_cache();
1325        });
1326        handle.join().unwrap();
1327
1328        // read json and confirm the changes to the data
1329
1330        let cache_path =
1331            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1332
1333        let json_db = BlockchainDb::new(meta, Some(cache_path));
1334
1335        let mut storage_data = StorageData::default();
1336        let mut storage_info = StorageInfo::default();
1337        storage_info.insert(U256::from(1), U256::from(10));
1338        storage_info.insert(U256::from(2), U256::from(15));
1339        storage_info.insert(U256::from(3), U256::from(20));
1340        storage_info.insert(U256::from(4), U256::from(20));
1341        storage_info.insert(U256::from(5), U256::from(15));
1342        storage_info.insert(U256::from(6), U256::from(10));
1343
1344        storage_data.insert(address, storage_info);
1345
1346        // redo the checks with the data extracted from the json file
1347        let max_slots = 5;
1348        let handle = std::thread::spawn(move || {
1349            for _ in 1..max_slots {
1350                for (address, info) in &storage_data {
1351                    for (index, value) in info {
1352                        let result_storage = {
1353                            let storage = json_db.storage().read();
1354                            let address_storage = storage.get(address).unwrap().clone();
1355                            *address_storage.get(index).unwrap()
1356                        };
1357
1358                        assert_eq!(
1359                            result_storage, *value,
1360                            "Storage in slot number {index} in address {address} doesn't have the same value"
1361                        );
1362                    }
1363                }
1364            }
1365        });
1366
1367        handle.join().unwrap();
1368
1369        // erase the temporary file
1370        fs::remove_file("test-data/storage-tmp.json").unwrap();
1371    }
1372
1373    #[tokio::test(flavor = "multi_thread")]
1374    async fn shared_backend_any_request() {
1375        let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1376        let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1377        let endpoint = format!("http://{}", server.server_addr());
1378
1379        // Spin an in-memory server that responds to "foo_callCustomMethod" rpc call.
1380        let expected_bytes_innner = expected_response_bytes.clone();
1381        let server_handle = std::thread::spawn(move || {
1382            #[derive(Debug, Deserialize)]
1383            struct Request {
1384                method: String,
1385            }
1386            let mut request = server.recv().unwrap();
1387            let rpc_request: Request =
1388                serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1389
1390            match rpc_request.method.as_str() {
1391                "foo_callCustomMethod" => request
1392                    .respond(Response::from_string(format!(
1393                        r#"{{"result": "{}"}}"#,
1394                        alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1395                    )))
1396                    .unwrap(),
1397                _ => request
1398                    .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1399                    .unwrap(),
1400            };
1401        });
1402
1403        let provider = get_http_provider(&endpoint);
1404        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1405
1406        let db = BlockchainDb::new(meta, None);
1407        let provider_inner = provider.clone();
1408        let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1409
1410        let actual_response_bytes = backend
1411            .do_any_request(async move {
1412                let bytes: alloy_primitives::Bytes =
1413                    provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1414                Ok(bytes)
1415            })
1416            .expect("failed performing any request");
1417
1418        assert_eq!(actual_response_bytes, expected_response_bytes);
1419
1420        server_handle.join().unwrap();
1421    }
1422}