foundry_fork_db/
backend.rs

1//! Smart caching and deduplication of requests when using a forking provider.
2
3use crate::{
4    cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5    error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9    network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction},
10    DynProvider, Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15    channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16    pin_mut,
17    stream::Stream,
18    task::{Context, Poll},
19    Future, FutureExt,
20};
21use revm::{
22    database::DatabaseRef,
23    primitives::{
24        map::{hash_map::Entry, AddressHashMap, HashMap},
25        KECCAK_EMPTY,
26    },
27    state::{AccountInfo, Bytecode},
28};
29use std::{
30    collections::VecDeque,
31    fmt,
32    future::IntoFuture,
33    path::Path,
34    pin::Pin,
35    sync::{
36        atomic::{AtomicU8, Ordering},
37        mpsc::{channel as oneshot_channel, Sender as OneshotSender},
38        Arc,
39    },
40};
41use tokio::select;
42
43/// Logged when an error is indicative that the user is trying to fork from a non-archive node.
44pub const NON_ARCHIVE_NODE_WARNING: &str = "\
45It looks like you're trying to fork from an older block with a non-archive node which is not \
46supported. Please try to change your RPC url to an archive node if the issue persists.";
47
48// Various future/request type aliases
49
50type AccountFuture<Err> =
51    Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
52type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
53type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
54type FullBlockFuture<Err> = Pin<
55    Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
56>;
57type TransactionFuture<Err> =
58    Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
59
60type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
61type StorageSender = OneshotSender<DatabaseResult<U256>>;
62type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
63type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
64type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
65
66type AddressData = AddressHashMap<AccountInfo>;
67type StorageData = AddressHashMap<StorageInfo>;
68type BlockHashData = HashMap<U256, B256>;
69
70/// States for tracking which account endpoints should be used when account info
71const ACCOUNT_FETCH_UNCHECKED: u8 = 0;
72/// Endpoints supports the non standard eth_getAccountInfo which is more efficient than sending 3
73/// separate requests
74const ACCOUNT_FETCH_SUPPORTS_ACC_INFO: u8 = 1;
75/// Use regular individual getCode, getNonce, getBalance calls
76const ACCOUNT_FETCH_SEPARATE_REQUESTS: u8 = 2;
77
78struct AnyRequestFuture<T, Err> {
79    sender: OneshotSender<Result<T, Err>>,
80    future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
81}
82
83impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
86    }
87}
88
89trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
90    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
91}
92
93/// @dev Implements `WrappedAnyRequest` for `AnyRequestFuture`.
94///
95/// - `poll_inner` is similar to `Future` polling but intentionally consumes the Future<Output=T>
96///   and return Future<Output=()>
97/// - This design avoids storing `Future<Output = T>` directly, as its type may not be known at
98///   compile time.
99/// - Instead, the result (`Result<T, Err>`) is sent via the `sender` channel, which enforces type
100///   safety.
101impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
102where
103    T: fmt::Debug + Send + 'static,
104    Err: fmt::Debug + Send + 'static,
105{
106    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
107        match self.future.poll_unpin(cx) {
108            Poll::Ready(result) => {
109                let _ = self.sender.send(result);
110                Poll::Ready(())
111            }
112            Poll::Pending => Poll::Pending,
113        }
114    }
115}
116
117/// Request variants that are executed by the provider
118enum ProviderRequest<Err> {
119    Account(AccountFuture<Err>),
120    Storage(StorageFuture<Err>),
121    BlockHash(BlockHashFuture<Err>),
122    FullBlock(FullBlockFuture<Err>),
123    Transaction(TransactionFuture<Err>),
124    AnyRequest(Box<dyn WrappedAnyRequest>),
125}
126
127/// The Request type the Backend listens for
128#[derive(Debug)]
129enum BackendRequest {
130    /// Fetch the account info
131    Basic(Address, AccountInfoSender),
132    /// Fetch a storage slot
133    Storage(Address, U256, StorageSender),
134    /// Fetch a block hash
135    BlockHash(u64, BlockHashSender),
136    /// Fetch an entire block with transactions
137    FullBlock(BlockId, FullBlockSender),
138    /// Fetch a transaction
139    Transaction(B256, TransactionSender),
140    /// Sets the pinned block to fetch data from
141    SetPinnedBlock(BlockId),
142
143    /// Update Address data
144    UpdateAddress(AddressData),
145    /// Update Storage data
146    UpdateStorage(StorageData),
147    /// Update Block Hashes
148    UpdateBlockHash(BlockHashData),
149    /// Any other request
150    AnyRequest(Box<dyn WrappedAnyRequest>),
151}
152
153/// Handles an internal provider and listens for requests.
154///
155/// This handler will remain active as long as it is reachable (request channel still open) and
156/// requests are in progress.
157#[must_use = "futures do nothing unless polled"]
158pub struct BackendHandler {
159    provider: DynProvider<AnyNetwork>,
160    /// Stores all the data.
161    db: BlockchainDb,
162    /// Requests currently in progress
163    pending_requests: Vec<ProviderRequest<eyre::Report>>,
164    /// Listeners that wait for a `get_account` related response
165    account_requests: HashMap<Address, Vec<AccountInfoSender>>,
166    /// Listeners that wait for a `get_storage_at` response
167    storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
168    /// Listeners that wait for a `get_block` response
169    block_requests: HashMap<u64, Vec<BlockHashSender>>,
170    /// Incoming commands.
171    incoming: UnboundedReceiver<BackendRequest>,
172    /// unprocessed queued requests
173    queued_requests: VecDeque<BackendRequest>,
174    /// The block to fetch data from.
175    // This is an `Option` so that we can have less code churn in the functions below
176    block_id: Option<BlockId>,
177    /// The mode for fetching account data
178    account_fetch_mode: Arc<AtomicU8>,
179}
180
181impl BackendHandler {
182    fn new(
183        provider: DynProvider<AnyNetwork>,
184        db: BlockchainDb,
185        rx: UnboundedReceiver<BackendRequest>,
186        block_id: Option<BlockId>,
187    ) -> Self {
188        Self {
189            provider,
190            db,
191            pending_requests: Default::default(),
192            account_requests: Default::default(),
193            storage_requests: Default::default(),
194            block_requests: Default::default(),
195            queued_requests: Default::default(),
196            incoming: rx,
197            block_id,
198            account_fetch_mode: Arc::new(AtomicU8::new(ACCOUNT_FETCH_UNCHECKED)),
199        }
200    }
201
202    /// handle the request in queue in the future.
203    ///
204    /// We always check:
205    ///  1. if the requested value is already stored in the cache, then answer the sender
206    ///  2. otherwise, fetch it via the provider but check if a request for that value is already in
207    ///     progress (e.g. another Sender just requested the same account)
208    fn on_request(&mut self, req: BackendRequest) {
209        match req {
210            BackendRequest::Basic(addr, sender) => {
211                trace!(target: "backendhandler", "received request basic address={:?}", addr);
212                let acc = self.db.accounts().read().get(&addr).cloned();
213                if let Some(basic) = acc {
214                    let _ = sender.send(Ok(basic));
215                } else {
216                    self.request_account(addr, sender);
217                }
218            }
219            BackendRequest::BlockHash(number, sender) => {
220                let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
221                if let Some(hash) = hash {
222                    let _ = sender.send(Ok(hash));
223                } else {
224                    self.request_hash(number, sender);
225                }
226            }
227            BackendRequest::FullBlock(number, sender) => {
228                self.request_full_block(number, sender);
229            }
230            BackendRequest::Transaction(tx, sender) => {
231                self.request_transaction(tx, sender);
232            }
233            BackendRequest::Storage(addr, idx, sender) => {
234                // account is already stored in the cache
235                let value =
236                    self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
237                if let Some(value) = value {
238                    let _ = sender.send(Ok(value));
239                } else {
240                    // account present but not storage -> fetch storage
241                    self.request_account_storage(addr, idx, sender);
242                }
243            }
244            BackendRequest::SetPinnedBlock(block_id) => {
245                self.block_id = Some(block_id);
246            }
247            BackendRequest::UpdateAddress(address_data) => {
248                for (address, data) in address_data {
249                    self.db.accounts().write().insert(address, data);
250                }
251            }
252            BackendRequest::UpdateStorage(storage_data) => {
253                for (address, data) in storage_data {
254                    self.db.storage().write().insert(address, data);
255                }
256            }
257            BackendRequest::UpdateBlockHash(block_hash_data) => {
258                for (block, hash) in block_hash_data {
259                    self.db.block_hashes().write().insert(block, hash);
260                }
261            }
262            BackendRequest::AnyRequest(fut) => {
263                self.pending_requests.push(ProviderRequest::AnyRequest(fut));
264            }
265        }
266    }
267
268    /// process a request for account's storage
269    fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
270        match self.storage_requests.entry((address, idx)) {
271            Entry::Occupied(mut entry) => {
272                entry.get_mut().push(listener);
273            }
274            Entry::Vacant(entry) => {
275                trace!(target: "backendhandler", %address, %idx, "preparing storage request");
276                entry.insert(vec![listener]);
277                let provider = self.provider.clone();
278                let block_id = self.block_id.unwrap_or_default();
279                let fut = Box::pin(async move {
280                    let storage = provider
281                        .get_storage_at(address, idx)
282                        .block_id(block_id)
283                        .await
284                        .map_err(Into::into);
285                    (storage, address, idx)
286                });
287                self.pending_requests.push(ProviderRequest::Storage(fut));
288            }
289        }
290    }
291
292    /// returns the future that fetches the account data
293    fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
294        trace!(target: "backendhandler", "preparing account request, address={:?}", address);
295
296        let provider = self.provider.clone();
297        let block_id = self.block_id.unwrap_or_default();
298        let mode = Arc::clone(&self.account_fetch_mode);
299        let fut = async move {
300            // depending on the tracked mode we can dispatch requests.
301            let initial_mode = mode.load(Ordering::Relaxed);
302            match initial_mode {
303                ACCOUNT_FETCH_UNCHECKED => {
304                    // single request for accountinfo object
305                    let acc_info_fut =
306                        provider.get_account_info(address).block_id(block_id).into_future();
307
308                    // tri request for account info
309                    let balance_fut =
310                        provider.get_balance(address).block_id(block_id).into_future();
311                    let nonce_fut =
312                        provider.get_transaction_count(address).block_id(block_id).into_future();
313                    let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
314                    let triple_fut = futures::future::try_join3(balance_fut, nonce_fut, code_fut);
315                    pin_mut!(acc_info_fut, triple_fut);
316
317                    select! {
318                        acc_info = &mut acc_info_fut => {
319                            match acc_info {
320                                Ok(info) => {
321                                 trace!(target: "backendhandler", "endpoint supports eth_getAccountInfo");
322                                    mode.store(ACCOUNT_FETCH_SUPPORTS_ACC_INFO, Ordering::Relaxed);
323                                    Ok((info.balance, info.nonce, info.code))
324                                }
325                                Err(err) => {
326                                    trace!(target: "backendhandler", ?err, "failed initial eth_getAccountInfo call");
327                                    mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
328                                    Ok(triple_fut.await?)
329                                }
330                            }
331                        }
332                        triple = &mut triple_fut => {
333                            match triple {
334                                Ok((balance, nonce, code)) => {
335                                    mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
336                                    Ok((balance, nonce, code))
337                                }
338                                Err(err) => Err(err.into())
339                            }
340                        }
341                    }
342                }
343
344                ACCOUNT_FETCH_SUPPORTS_ACC_INFO => {
345                    let mut res = provider
346                        .get_account_info(address)
347                        .block_id(block_id)
348                        .into_future()
349                        .await
350                        .map(|info| (info.balance, info.nonce, info.code));
351
352                    // it's possible that the configured endpoint load balances requests to multiple
353                    // instances and not all support that endpoint so we should reset here
354                    if res.is_err() {
355                        mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
356
357                        let balance_fut =
358                            provider.get_balance(address).block_id(block_id).into_future();
359                        let nonce_fut = provider
360                            .get_transaction_count(address)
361                            .block_id(block_id)
362                            .into_future();
363                        let code_fut =
364                            provider.get_code_at(address).block_id(block_id).into_future();
365                        res = futures::future::try_join3(balance_fut, nonce_fut, code_fut).await;
366                    }
367
368                    Ok(res?)
369                }
370
371                ACCOUNT_FETCH_SEPARATE_REQUESTS => {
372                    let balance_fut =
373                        provider.get_balance(address).block_id(block_id).into_future();
374                    let nonce_fut =
375                        provider.get_transaction_count(address).block_id(block_id).into_future();
376                    let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
377
378                    Ok(futures::future::try_join3(balance_fut, nonce_fut, code_fut).await?)
379                }
380
381                _ => unreachable!("Invalid account fetch mode"),
382            }
383        };
384
385        ProviderRequest::Account(Box::pin(async move {
386            let result = fut.await;
387            (result, address)
388        }))
389    }
390
391    /// process a request for an account
392    fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
393        match self.account_requests.entry(address) {
394            Entry::Occupied(mut entry) => {
395                entry.get_mut().push(listener);
396            }
397            Entry::Vacant(entry) => {
398                entry.insert(vec![listener]);
399                self.pending_requests.push(self.get_account_req(address));
400            }
401        }
402    }
403
404    /// process a request for an entire block
405    fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
406        let provider = self.provider.clone();
407        let fut = Box::pin(async move {
408            let block = provider
409                .get_block(number)
410                .full()
411                .await
412                .wrap_err(format!("could not fetch block {number:?}"));
413            (sender, block, number)
414        });
415
416        self.pending_requests.push(ProviderRequest::FullBlock(fut));
417    }
418
419    /// process a request for a transactions
420    fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
421        let provider = self.provider.clone();
422        let fut = Box::pin(async move {
423            let block = provider
424                .get_transaction_by_hash(tx)
425                .await
426                .wrap_err_with(|| format!("could not get transaction {tx}"))
427                .and_then(|maybe| {
428                    maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
429                });
430            (sender, block, tx)
431        });
432
433        self.pending_requests.push(ProviderRequest::Transaction(fut));
434    }
435
436    /// process a request for a block hash
437    fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
438        match self.block_requests.entry(number) {
439            Entry::Occupied(mut entry) => {
440                entry.get_mut().push(listener);
441            }
442            Entry::Vacant(entry) => {
443                trace!(target: "backendhandler", number, "preparing block hash request");
444                entry.insert(vec![listener]);
445                let provider = self.provider.clone();
446                let fut = Box::pin(async move {
447                    let block = provider
448                        .get_block_by_number(number.into())
449                        .hashes()
450                        .await
451                        .wrap_err("failed to get block");
452
453                    let block_hash = match block {
454                        Ok(Some(block)) => Ok(block.header.hash),
455                        Ok(None) => {
456                            warn!(target: "backendhandler", ?number, "block not found");
457                            // if no block was returned then the block does not exist, in which case
458                            // we return empty hash
459                            Ok(KECCAK_EMPTY)
460                        }
461                        Err(err) => {
462                            error!(target: "backendhandler", %err, ?number, "failed to get block");
463                            Err(err)
464                        }
465                    };
466                    (block_hash, number)
467                });
468                self.pending_requests.push(ProviderRequest::BlockHash(fut));
469            }
470        }
471    }
472}
473
474impl Future for BackendHandler {
475    type Output = ();
476
477    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
478        let pin = self.get_mut();
479        loop {
480            // Drain queued requests first.
481            while let Some(req) = pin.queued_requests.pop_front() {
482                pin.on_request(req)
483            }
484
485            // receive new requests to delegate to the underlying provider
486            loop {
487                match Pin::new(&mut pin.incoming).poll_next(cx) {
488                    Poll::Ready(Some(req)) => {
489                        pin.queued_requests.push_back(req);
490                    }
491                    Poll::Ready(None) => {
492                        trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
493                        return Poll::Ready(());
494                    }
495                    Poll::Pending => break,
496                }
497            }
498
499            // poll all requests in progress
500            for n in (0..pin.pending_requests.len()).rev() {
501                let mut request = pin.pending_requests.swap_remove(n);
502                match &mut request {
503                    ProviderRequest::Account(fut) => {
504                        if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
505                            // get the response
506                            let (balance, nonce, code) = match resp {
507                                Ok(res) => res,
508                                Err(err) => {
509                                    let err = Arc::new(err);
510                                    if let Some(listeners) = pin.account_requests.remove(&addr) {
511                                        listeners.into_iter().for_each(|l| {
512                                            let _ = l.send(Err(DatabaseError::GetAccount(
513                                                addr,
514                                                Arc::clone(&err),
515                                            )));
516                                        })
517                                    }
518                                    continue;
519                                }
520                            };
521
522                            // convert it to revm-style types
523                            let (code, code_hash) = if !code.is_empty() {
524                                (code.clone(), keccak256(&code))
525                            } else {
526                                (Bytes::default(), KECCAK_EMPTY)
527                            };
528
529                            // update the cache
530                            let acc = AccountInfo {
531                                nonce,
532                                balance,
533                                code: Some(Bytecode::new_raw(code)),
534                                code_hash,
535                                account_id: None,
536                            };
537                            pin.db.accounts().write().insert(addr, acc.clone());
538
539                            // notify all listeners
540                            if let Some(listeners) = pin.account_requests.remove(&addr) {
541                                listeners.into_iter().for_each(|l| {
542                                    let _ = l.send(Ok(acc.clone()));
543                                })
544                            }
545                            continue;
546                        }
547                    }
548                    ProviderRequest::Storage(fut) => {
549                        if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
550                            let value = match resp {
551                                Ok(value) => value,
552                                Err(err) => {
553                                    // notify all listeners
554                                    let err = Arc::new(err);
555                                    if let Some(listeners) =
556                                        pin.storage_requests.remove(&(addr, idx))
557                                    {
558                                        listeners.into_iter().for_each(|l| {
559                                            let _ = l.send(Err(DatabaseError::GetStorage(
560                                                addr,
561                                                idx,
562                                                Arc::clone(&err),
563                                            )));
564                                        })
565                                    }
566                                    continue;
567                                }
568                            };
569
570                            // update the cache
571                            pin.db.storage().write().entry(addr).or_default().insert(idx, value);
572
573                            // notify all listeners
574                            if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
575                                listeners.into_iter().for_each(|l| {
576                                    let _ = l.send(Ok(value));
577                                })
578                            }
579                            continue;
580                        }
581                    }
582                    ProviderRequest::BlockHash(fut) => {
583                        if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
584                            let value = match block_hash {
585                                Ok(value) => value,
586                                Err(err) => {
587                                    let err = Arc::new(err);
588                                    // notify all listeners
589                                    if let Some(listeners) = pin.block_requests.remove(&number) {
590                                        listeners.into_iter().for_each(|l| {
591                                            let _ = l.send(Err(DatabaseError::GetBlockHash(
592                                                number,
593                                                Arc::clone(&err),
594                                            )));
595                                        })
596                                    }
597                                    continue;
598                                }
599                            };
600
601                            // update the cache
602                            pin.db.block_hashes().write().insert(U256::from(number), value);
603
604                            // notify all listeners
605                            if let Some(listeners) = pin.block_requests.remove(&number) {
606                                listeners.into_iter().for_each(|l| {
607                                    let _ = l.send(Ok(value));
608                                })
609                            }
610                            continue;
611                        }
612                    }
613                    ProviderRequest::FullBlock(fut) => {
614                        if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
615                            let msg = match resp {
616                                Ok(Some(block)) => Ok(block),
617                                Ok(None) => Err(DatabaseError::BlockNotFound(number)),
618                                Err(err) => {
619                                    let err = Arc::new(err);
620                                    Err(DatabaseError::GetFullBlock(number, err))
621                                }
622                            };
623                            let _ = sender.send(msg);
624                            continue;
625                        }
626                    }
627                    ProviderRequest::Transaction(fut) => {
628                        if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
629                            let msg = match tx {
630                                Ok(tx) => Ok(tx),
631                                Err(err) => {
632                                    let err = Arc::new(err);
633                                    Err(DatabaseError::GetTransaction(tx_hash, err))
634                                }
635                            };
636                            let _ = sender.send(msg);
637                            continue;
638                        }
639                    }
640                    ProviderRequest::AnyRequest(fut) => {
641                        if fut.poll_inner(cx).is_ready() {
642                            continue;
643                        }
644                    }
645                }
646                // not ready, insert and poll again
647                pin.pending_requests.push(request);
648            }
649
650            // If no new requests have been queued, break to
651            // be polled again later.
652            if pin.queued_requests.is_empty() {
653                return Poll::Pending;
654            }
655        }
656    }
657}
658
659/// Mode for the `SharedBackend` how to block in the non-async [`DatabaseRef`] when interacting with
660/// [`BackendHandler`].
661#[derive(Default, Clone, Debug, PartialEq)]
662pub enum BlockingMode {
663    /// This mode use `tokio::task::block_in_place()` to block in place.
664    ///
665    /// This should be used when blocking on the call site is disallowed.
666    #[default]
667    BlockInPlace,
668    /// The mode blocks the current task
669    ///
670    /// This can be used if blocking on the call site is allowed, e.g. on a tokio blocking task.
671    Block,
672}
673
674impl BlockingMode {
675    /// run process logic with the blocking mode
676    pub fn run<F, R>(&self, f: F) -> R
677    where
678        F: FnOnce() -> R,
679    {
680        match self {
681            Self::BlockInPlace => tokio::task::block_in_place(f),
682            Self::Block => f(),
683        }
684    }
685}
686
687/// A cloneable backend type that shares access to the backend data with all its clones.
688///
689/// This backend type is connected to the `BackendHandler` via a mpsc unbounded channel. The
690/// `BackendHandler` is spawned on a tokio task and listens for incoming commands on the receiver
691/// half of the channel. A `SharedBackend` holds a sender for that channel, which is `Clone`, so
692/// there can be multiple `SharedBackend`s communicating with the same `BackendHandler`, hence this
693/// `Backend` type is thread safe.
694///
695/// All `Backend` trait functions are delegated as a `BackendRequest` via the channel to the
696/// `BackendHandler`. All `BackendRequest` variants include a sender half of an additional channel
697/// that is used by the `BackendHandler` to send the result of an executed `BackendRequest` back to
698/// `SharedBackend`.
699///
700/// The `BackendHandler` holds a `Provider` to look up missing accounts or storage slots
701/// from remote (e.g. infura). It detects duplicate requests from multiple `SharedBackend`s and
702/// bundles them together, so that always only one provider request is executed. For example, there
703/// are two `SharedBackend`s, `A` and `B`, both request the basic account info of account
704/// `0xasd9sa7d...` at the same time. After the `BackendHandler` receives the request from `A`, it
705/// sends a new provider request to the provider's endpoint, then it reads the identical request
706/// from `B` and simply adds it as an additional listener for the request already in progress,
707/// instead of sending another one. So that after the provider returns the response all listeners
708/// (`A` and `B`) get notified.
709// **Note**: the implementation makes use of [tokio::task::block_in_place()] when interacting with
710// the underlying [BackendHandler] which runs on a separate spawned tokio task.
711// [tokio::task::block_in_place()]
712// > Runs the provided blocking function on the current thread without blocking the executor.
713// This prevents issues (hangs) we ran into were the [SharedBackend] itself is called from a spawned
714// task.
715#[derive(Clone, Debug)]
716pub struct SharedBackend {
717    /// channel used for sending commands related to database operations
718    backend: UnboundedSender<BackendRequest>,
719    /// Ensures that the underlying cache gets flushed once the last `SharedBackend` is dropped.
720    ///
721    /// There is only one instance of the type, so as soon as the last `SharedBackend` is deleted,
722    /// `FlushJsonBlockCacheDB` is also deleted and the cache is flushed.
723    cache: Arc<FlushJsonBlockCacheDB>,
724
725    /// The mode for the `SharedBackend` to block in place or not
726    blocking_mode: BlockingMode,
727}
728
729impl SharedBackend {
730    /// _Spawns_ a new `BackendHandler` on a `tokio::task` that listens for requests from any
731    /// `SharedBackend`. Missing values get inserted in the `db`.
732    ///
733    /// The spawned `BackendHandler` finishes once the last `SharedBackend` connected to it is
734    /// dropped.
735    pub async fn spawn_backend<P: Provider<AnyNetwork> + 'static>(
736        provider: P,
737        db: BlockchainDb,
738        pin_block: Option<BlockId>,
739    ) -> Self {
740        let (shared, handler) = Self::new(provider, db, pin_block);
741        // spawn the provider handler to a task
742        trace!(target: "backendhandler", "spawning Backendhandler task");
743        tokio::spawn(handler);
744        shared
745    }
746
747    /// Same as `Self::spawn_backend` but spawns the `BackendHandler` on a separate `std::thread` in
748    /// its own `tokio::Runtime`
749    pub fn spawn_backend_thread<P: Provider<AnyNetwork> + 'static>(
750        provider: P,
751        db: BlockchainDb,
752        pin_block: Option<BlockId>,
753    ) -> Self {
754        let (shared, handler) = Self::new(provider, db, pin_block);
755
756        // spawn a light-weight thread with a thread-local async runtime just for
757        // sending and receiving data from the remote client
758        std::thread::Builder::new()
759            .name("fork-backend".into())
760            .spawn(move || {
761                let rt = tokio::runtime::Builder::new_current_thread()
762                    .enable_all()
763                    .build()
764                    .expect("failed to build tokio runtime");
765
766                rt.block_on(handler);
767            })
768            .expect("failed to spawn thread");
769        trace!(target: "backendhandler", "spawned Backendhandler thread");
770
771        shared
772    }
773
774    /// Returns a new `SharedBackend` and the `BackendHandler`
775    pub fn new<P: Provider<AnyNetwork> + 'static>(
776        provider: P,
777        db: BlockchainDb,
778        pin_block: Option<BlockId>,
779    ) -> (Self, BackendHandler) {
780        let (backend, backend_rx) = unbounded();
781        let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
782        let handler = BackendHandler::new(provider.erased(), db, backend_rx, pin_block);
783        (Self { backend, cache, blocking_mode: Default::default() }, handler)
784    }
785
786    /// Returns a new `SharedBackend` and the `BackendHandler` with a specific blocking mode
787    pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
788        Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
789    }
790
791    /// Updates the pinned block to fetch data from
792    pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
793        let req = BackendRequest::SetPinnedBlock(block.into());
794        self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
795    }
796
797    /// Returns the full block for the given block identifier
798    pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
799        self.blocking_mode.run(|| {
800            let (sender, rx) = oneshot_channel();
801            let req = BackendRequest::FullBlock(block.into(), sender);
802            self.backend.unbounded_send(req)?;
803            rx.recv()?
804        })
805    }
806
807    /// Returns the transaction for the hash
808    pub fn get_transaction(&self, tx: B256) -> DatabaseResult<AnyRpcTransaction> {
809        self.blocking_mode.run(|| {
810            let (sender, rx) = oneshot_channel();
811            let req = BackendRequest::Transaction(tx, sender);
812            self.backend.unbounded_send(req)?;
813            rx.recv()?
814        })
815    }
816
817    fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
818        self.blocking_mode.run(|| {
819            let (sender, rx) = oneshot_channel();
820            let req = BackendRequest::Basic(address, sender);
821            self.backend.unbounded_send(req)?;
822            rx.recv()?.map(Some)
823        })
824    }
825
826    fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
827        self.blocking_mode.run(|| {
828            let (sender, rx) = oneshot_channel();
829            let req = BackendRequest::Storage(address, index, sender);
830            self.backend.unbounded_send(req)?;
831            rx.recv()?
832        })
833    }
834
835    fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
836        self.blocking_mode.run(|| {
837            let (sender, rx) = oneshot_channel();
838            let req = BackendRequest::BlockHash(number, sender);
839            self.backend.unbounded_send(req)?;
840            rx.recv()?
841        })
842    }
843
844    /// Inserts or updates data for multiple addresses
845    pub fn insert_or_update_address(&self, address_data: AddressData) {
846        let req = BackendRequest::UpdateAddress(address_data);
847        let err = self.backend.unbounded_send(req);
848        match err {
849            Ok(_) => (),
850            Err(e) => {
851                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
852            }
853        }
854    }
855
856    /// Inserts or updates data for multiple storage slots
857    pub fn insert_or_update_storage(&self, storage_data: StorageData) {
858        let req = BackendRequest::UpdateStorage(storage_data);
859        let err = self.backend.unbounded_send(req);
860        match err {
861            Ok(_) => (),
862            Err(e) => {
863                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
864            }
865        }
866    }
867
868    /// Inserts or updates data for multiple block hashes
869    pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
870        let req = BackendRequest::UpdateBlockHash(block_hash_data);
871        let err = self.backend.unbounded_send(req);
872        match err {
873            Ok(_) => (),
874            Err(e) => {
875                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
876            }
877        }
878    }
879
880    /// Returns any arbitrary request on the provider
881    pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
882    where
883        F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
884        T: fmt::Debug + Send + 'static,
885    {
886        self.blocking_mode.run(|| {
887            let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
888            let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
889                sender,
890                future: Box::pin(fut),
891            }));
892            self.backend.unbounded_send(req)?;
893            rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
894        })
895    }
896
897    /// Flushes the DB to disk if caching is enabled
898    pub fn flush_cache(&self) {
899        self.cache.0.flush();
900    }
901
902    /// Flushes the DB to a specific file
903    pub fn flush_cache_to(&self, cache_path: &Path) {
904        self.cache.0.flush_to(cache_path);
905    }
906
907    /// Returns the DB
908    pub fn data(&self) -> Arc<MemDb> {
909        self.cache.0.db().clone()
910    }
911
912    /// Returns the DB accounts
913    pub fn accounts(&self) -> AddressData {
914        self.cache.0.db().accounts.read().clone()
915    }
916
917    /// Returns the DB accounts length
918    pub fn accounts_len(&self) -> usize {
919        self.cache.0.db().accounts.read().len()
920    }
921
922    /// Returns the DB storage
923    pub fn storage(&self) -> StorageData {
924        self.cache.0.db().storage.read().clone()
925    }
926
927    /// Returns the DB storage length
928    pub fn storage_len(&self) -> usize {
929        self.cache.0.db().storage.read().len()
930    }
931
932    /// Returns the DB block_hashes
933    pub fn block_hashes(&self) -> BlockHashData {
934        self.cache.0.db().block_hashes.read().clone()
935    }
936
937    /// Returns the DB block_hashes length
938    pub fn block_hashes_len(&self) -> usize {
939        self.cache.0.db().block_hashes.read().len()
940    }
941}
942
943impl DatabaseRef for SharedBackend {
944    type Error = DatabaseError;
945
946    fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
947        trace!(target: "sharedbackend", %address, "request basic");
948        self.do_get_basic(address).inspect_err(|err| {
949            error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
950            if err.is_possibly_non_archive_node_error() {
951                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
952            }
953        })
954    }
955
956    fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
957        Err(DatabaseError::MissingCode(hash))
958    }
959
960    fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
961        trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
962        self.do_get_storage(address, index).inspect_err(|err| {
963            error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
964            if err.is_possibly_non_archive_node_error() {
965                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
966            }
967        })
968    }
969
970    fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
971        trace!(target: "sharedbackend", "request block hash for number {:?}", number);
972        self.do_get_block_hash(number).inspect_err(|err| {
973            error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
974            if err.is_possibly_non_archive_node_error() {
975                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
976            }
977        })
978    }
979}
980
981#[cfg(test)]
982mod tests {
983    use super::*;
984    use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
985    use alloy_consensus::BlockHeader;
986    use alloy_provider::ProviderBuilder;
987    use alloy_rpc_client::ClientBuilder;
988    use serde::Deserialize;
989    use std::{fs, path::PathBuf};
990    use tiny_http::{Response, Server};
991
992    pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
993        ProviderBuilder::new()
994            .network::<AnyNetwork>()
995            .connect_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
996    }
997
998    const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
999
1000    #[tokio::test(flavor = "multi_thread")]
1001    async fn test_builder() {
1002        let Some(endpoint) = ENDPOINT else { return };
1003        let provider = get_http_provider(endpoint);
1004
1005        let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
1006        let meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
1007
1008        assert_eq!(meta.block_env.number, U256::from(any_rpc_block.header.number()));
1009    }
1010
1011    #[tokio::test(flavor = "multi_thread")]
1012    async fn shared_backend() {
1013        let Some(endpoint) = ENDPOINT else { return };
1014
1015        let provider = get_http_provider(endpoint);
1016        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1017
1018        let db = BlockchainDb::new(meta, None);
1019        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1020
1021        // some rng contract from etherscan
1022        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1023
1024        let idx = U256::from(0u64);
1025        let value = backend.storage_ref(address, idx).unwrap();
1026        let account = backend.basic_ref(address).unwrap().unwrap();
1027
1028        let mem_acc = db.accounts().read().get(&address).unwrap().clone();
1029        assert_eq!(account.balance, mem_acc.balance);
1030        assert_eq!(account.nonce, mem_acc.nonce);
1031        let slots = db.storage().read().get(&address).unwrap().clone();
1032        assert_eq!(slots.len(), 1);
1033        assert_eq!(slots.get(&idx).copied().unwrap(), value);
1034
1035        let num = 10u64;
1036        let hash = backend.block_hash_ref(num).unwrap();
1037        let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
1038        assert_eq!(hash, mem_hash);
1039
1040        let max_slots = 5;
1041        let handle = std::thread::spawn(move || {
1042            for i in 1..max_slots {
1043                let idx = U256::from(i);
1044                let _ = backend.storage_ref(address, idx);
1045            }
1046        });
1047        handle.join().unwrap();
1048        let slots = db.storage().read().get(&address).unwrap().clone();
1049        assert_eq!(slots.len() as u64, max_slots);
1050    }
1051
1052    #[test]
1053    fn can_read_cache() {
1054        let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
1055        let json = JsonBlockCacheDB::load(cache_path).unwrap();
1056        assert!(!json.db().accounts.read().is_empty());
1057    }
1058
1059    #[tokio::test(flavor = "multi_thread")]
1060    async fn can_modify_address() {
1061        let Some(endpoint) = ENDPOINT else { return };
1062
1063        let provider = get_http_provider(endpoint);
1064        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1065
1066        let db = BlockchainDb::new(meta, None);
1067        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1068
1069        // some rng contract from etherscan
1070        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1071
1072        let new_acc = AccountInfo {
1073            nonce: 1000u64,
1074            balance: U256::from(2000),
1075            code: None,
1076            code_hash: KECCAK_EMPTY,
1077            account_id: None,
1078        };
1079        let mut account_data = AddressData::default();
1080        account_data.insert(address, new_acc.clone());
1081
1082        backend.insert_or_update_address(account_data);
1083
1084        let max_slots = 5;
1085        let handle = std::thread::spawn(move || {
1086            for i in 1..max_slots {
1087                let idx = U256::from(i);
1088                let result_address = backend.basic_ref(address).unwrap();
1089                match result_address {
1090                    Some(acc) => {
1091                        assert_eq!(
1092                            acc.nonce, new_acc.nonce,
1093                            "The nonce was not changed in instance of index {idx}"
1094                        );
1095                        assert_eq!(
1096                            acc.balance, new_acc.balance,
1097                            "The balance was not changed in instance of index {idx}"
1098                        );
1099
1100                        // comparing with db
1101                        let db_address = {
1102                            let accounts = db.accounts().read();
1103                            accounts.get(&address).unwrap().clone()
1104                        };
1105
1106                        assert_eq!(
1107                            db_address.nonce, new_acc.nonce,
1108                            "The nonce was not changed in instance of index {idx}"
1109                        );
1110                        assert_eq!(
1111                            db_address.balance, new_acc.balance,
1112                            "The balance was not changed in instance of index {idx}"
1113                        );
1114                    }
1115                    None => panic!("Account not found"),
1116                }
1117            }
1118        });
1119        handle.join().unwrap();
1120    }
1121
1122    #[tokio::test(flavor = "multi_thread")]
1123    async fn can_modify_storage() {
1124        let Some(endpoint) = ENDPOINT else { return };
1125
1126        let provider = get_http_provider(endpoint);
1127        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1128
1129        let db = BlockchainDb::new(meta, None);
1130        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1131
1132        // some rng contract from etherscan
1133        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1134
1135        let mut storage_data = StorageData::default();
1136        let mut storage_info = StorageInfo::default();
1137        storage_info.insert(U256::from(20), U256::from(10));
1138        storage_info.insert(U256::from(30), U256::from(15));
1139        storage_info.insert(U256::from(40), U256::from(20));
1140
1141        storage_data.insert(address, storage_info);
1142
1143        backend.insert_or_update_storage(storage_data.clone());
1144
1145        let max_slots = 5;
1146        let handle = std::thread::spawn(move || {
1147            for _ in 1..max_slots {
1148                for (address, info) in &storage_data {
1149                    for (index, value) in info {
1150                        let result_storage = backend.do_get_storage(*address, *index);
1151                        match result_storage {
1152                            Ok(stg_db) => {
1153                                assert_eq!(
1154                                    stg_db, *value,
1155                                    "Storage in slot number {index} in address {address} do not have the same value"
1156                                );
1157
1158                                let db_result = {
1159                                    let storage = db.storage().read();
1160                                    let address_storage = storage.get(address).unwrap();
1161                                    *address_storage.get(index).unwrap()
1162                                };
1163
1164                                assert_eq!(
1165                                    stg_db, db_result,
1166                                    "Storage in slot number {index} in address {address} do not have the same value"
1167                                )
1168                            }
1169
1170                            Err(err) => {
1171                                panic!("There was a database error: {err}")
1172                            }
1173                        }
1174                    }
1175                }
1176            }
1177        });
1178        handle.join().unwrap();
1179    }
1180
1181    #[tokio::test(flavor = "multi_thread")]
1182    async fn can_modify_block_hashes() {
1183        let Some(endpoint) = ENDPOINT else { return };
1184
1185        let provider = get_http_provider(endpoint);
1186        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1187
1188        let db = BlockchainDb::new(meta, None);
1189        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1190
1191        // some rng contract from etherscan
1192        // let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1193
1194        let mut block_hash_data = BlockHashData::default();
1195        block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1196        block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1197        block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1198        block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1199        block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1200
1201        backend.insert_or_update_block_hashes(block_hash_data.clone());
1202
1203        let max_slots: u64 = 5;
1204        let handle = std::thread::spawn(move || {
1205            for i in 1..max_slots {
1206                let key = U256::from(i);
1207                let result_hash = backend.do_get_block_hash(i);
1208                match result_hash {
1209                    Ok(hash) => {
1210                        assert_eq!(
1211                            hash,
1212                            *block_hash_data.get(&key).unwrap(),
1213                            "The hash in block {key} did not match"
1214                        );
1215
1216                        let db_result = {
1217                            let hashes = db.block_hashes().read();
1218                            *hashes.get(&key).unwrap()
1219                        };
1220
1221                        assert_eq!(hash, db_result, "The hash in block {key} did not match");
1222                    }
1223                    Err(err) => panic!("Hash not found, error: {err}"),
1224                }
1225            }
1226        });
1227        handle.join().unwrap();
1228    }
1229
1230    #[tokio::test(flavor = "multi_thread")]
1231    async fn can_modify_storage_with_cache() {
1232        let Some(endpoint) = ENDPOINT else { return };
1233
1234        let provider = get_http_provider(endpoint);
1235        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1236
1237        // create a temporary file
1238        fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1239
1240        let cache_path =
1241            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1242
1243        let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1244        let backend =
1245            SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1246
1247        // some rng contract from etherscan
1248        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1249
1250        let mut storage_data = StorageData::default();
1251        let mut storage_info = StorageInfo::default();
1252        storage_info.insert(U256::from(1), U256::from(10));
1253        storage_info.insert(U256::from(2), U256::from(15));
1254        storage_info.insert(U256::from(3), U256::from(20));
1255        storage_info.insert(U256::from(4), U256::from(20));
1256        storage_info.insert(U256::from(5), U256::from(15));
1257        storage_info.insert(U256::from(6), U256::from(10));
1258
1259        let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1260        address_data.code = None;
1261
1262        storage_data.insert(address, storage_info);
1263
1264        backend.insert_or_update_storage(storage_data.clone());
1265
1266        let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1267        // nullify the code
1268        new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1269
1270        let mut account_data = AddressData::default();
1271        account_data.insert(address, new_acc.clone());
1272
1273        backend.insert_or_update_address(account_data);
1274
1275        let backend_clone = backend.clone();
1276
1277        let max_slots = 5;
1278        let handle = std::thread::spawn(move || {
1279            for _ in 1..max_slots {
1280                for (address, info) in &storage_data {
1281                    for (index, value) in info {
1282                        let result_storage = backend.do_get_storage(*address, *index);
1283                        match result_storage {
1284                            Ok(stg_db) => {
1285                                assert_eq!(
1286                                    stg_db, *value,
1287                                    "Storage in slot number {index} in address {address} doesn't have the same value"
1288                                );
1289
1290                                let db_result = {
1291                                    let storage = db.storage().read();
1292                                    let address_storage = storage.get(address).unwrap();
1293                                    *address_storage.get(index).unwrap()
1294                                };
1295
1296                                assert_eq!(
1297                                    stg_db, db_result,
1298                                    "Storage in slot number {index} in address {address} doesn't have the same value"
1299                                );
1300                            }
1301
1302                            Err(err) => {
1303                                panic!("There was a database error: {err}")
1304                            }
1305                        }
1306                    }
1307                }
1308            }
1309
1310            backend_clone.flush_cache();
1311        });
1312        handle.join().unwrap();
1313
1314        // read json and confirm the changes to the data
1315
1316        let cache_path =
1317            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1318
1319        let json_db = BlockchainDb::new(meta, Some(cache_path));
1320
1321        let mut storage_data = StorageData::default();
1322        let mut storage_info = StorageInfo::default();
1323        storage_info.insert(U256::from(1), U256::from(10));
1324        storage_info.insert(U256::from(2), U256::from(15));
1325        storage_info.insert(U256::from(3), U256::from(20));
1326        storage_info.insert(U256::from(4), U256::from(20));
1327        storage_info.insert(U256::from(5), U256::from(15));
1328        storage_info.insert(U256::from(6), U256::from(10));
1329
1330        storage_data.insert(address, storage_info);
1331
1332        // redo the checks with the data extracted from the json file
1333        let max_slots = 5;
1334        let handle = std::thread::spawn(move || {
1335            for _ in 1..max_slots {
1336                for (address, info) in &storage_data {
1337                    for (index, value) in info {
1338                        let result_storage = {
1339                            let storage = json_db.storage().read();
1340                            let address_storage = storage.get(address).unwrap().clone();
1341                            *address_storage.get(index).unwrap()
1342                        };
1343
1344                        assert_eq!(
1345                            result_storage, *value,
1346                            "Storage in slot number {index} in address {address} doesn't have the same value"
1347                        );
1348                    }
1349                }
1350            }
1351        });
1352
1353        handle.join().unwrap();
1354
1355        // erase the temporary file
1356        fs::remove_file("test-data/storage-tmp.json").unwrap();
1357    }
1358
1359    #[tokio::test(flavor = "multi_thread")]
1360    async fn shared_backend_any_request() {
1361        let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1362        let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1363        let endpoint = format!("http://{}", server.server_addr());
1364
1365        // Spin an in-memory server that responds to "foo_callCustomMethod" rpc call.
1366        let expected_bytes_innner = expected_response_bytes.clone();
1367        let server_handle = std::thread::spawn(move || {
1368            #[derive(Debug, Deserialize)]
1369            struct Request {
1370                method: String,
1371            }
1372            let mut request = server.recv().unwrap();
1373            let rpc_request: Request =
1374                serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1375
1376            match rpc_request.method.as_str() {
1377                "foo_callCustomMethod" => request
1378                    .respond(Response::from_string(format!(
1379                        r#"{{"result": "{}"}}"#,
1380                        alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1381                    )))
1382                    .unwrap(),
1383                _ => request
1384                    .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1385                    .unwrap(),
1386            };
1387        });
1388
1389        let provider = get_http_provider(&endpoint);
1390        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1391
1392        let db = BlockchainDb::new(meta, None);
1393        let provider_inner = provider.clone();
1394        let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1395
1396        let actual_response_bytes = backend
1397            .do_any_request(async move {
1398                let bytes: alloy_primitives::Bytes =
1399                    provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1400                Ok(bytes)
1401            })
1402            .expect("failed performing any request");
1403
1404        assert_eq!(actual_response_bytes, expected_response_bytes);
1405
1406        server_handle.join().unwrap();
1407    }
1408}