Skip to main content

foundry_fork_db/
backend.rs

1//! Smart caching and deduplication of requests when using a forking provider.
2
3use crate::{
4    cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5    error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9    network::{primitives::HeaderResponse, AnyNetwork, BlockResponse},
10    DynProvider, Network, Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15    channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16    pin_mut,
17    stream::Stream,
18    task::{Context, Poll},
19    Future, FutureExt,
20};
21use revm::{
22    database::DatabaseRef,
23    primitives::{
24        map::{hash_map::Entry, AddressHashMap, HashMap},
25        KECCAK_EMPTY,
26    },
27    state::{AccountInfo, Bytecode},
28};
29use std::{
30    collections::VecDeque,
31    fmt,
32    future::IntoFuture,
33    path::Path,
34    pin::Pin,
35    sync::{
36        atomic::{AtomicU8, Ordering},
37        mpsc::{channel as oneshot_channel, Sender as OneshotSender},
38        Arc,
39    },
40};
41use tokio::select;
42
43/// Logged when an error is indicative that the user is trying to fork from a non-archive node.
44pub const NON_ARCHIVE_NODE_WARNING: &str = "\
45It looks like you're trying to fork from an older block with a non-archive node which is not \
46supported. Please try to change your RPC url to an archive node if the issue persists.";
47
48// Various future/request type aliases
49
50type AccountFuture<Err> =
51    Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
52type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
53type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
54type FullBlockFuture<Err, N = AnyNetwork> = Pin<
55    Box<
56        dyn Future<
57                Output = (
58                    FullBlockSender<N>,
59                    Result<Option<<N as Network>::BlockResponse>, Err>,
60                    BlockId,
61                ),
62            > + Send,
63    >,
64>;
65type TransactionFuture<Err, N = AnyNetwork> = Pin<
66    Box<
67        dyn Future<
68                Output = (
69                    TransactionSender<N>,
70                    Result<<N as Network>::TransactionResponse, Err>,
71                    B256,
72                ),
73            > + Send,
74    >,
75>;
76
77type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
78type StorageSender = OneshotSender<DatabaseResult<U256>>;
79type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
80type FullBlockSender<N = AnyNetwork> = OneshotSender<DatabaseResult<<N as Network>::BlockResponse>>;
81type TransactionSender<N = AnyNetwork> =
82    OneshotSender<DatabaseResult<<N as Network>::TransactionResponse>>;
83
84type AddressData = AddressHashMap<AccountInfo>;
85type StorageData = AddressHashMap<StorageInfo>;
86type BlockHashData = HashMap<U256, B256>;
87
88/// States for tracking which account endpoints should be used when account info
89const ACCOUNT_FETCH_UNCHECKED: u8 = 0;
90/// Endpoints supports the non standard eth_getAccountInfo which is more efficient than sending 3
91/// separate requests
92const ACCOUNT_FETCH_SUPPORTS_ACC_INFO: u8 = 1;
93/// Use regular individual getCode, getNonce, getBalance calls
94const ACCOUNT_FETCH_SEPARATE_REQUESTS: u8 = 2;
95
96struct AnyRequestFuture<T, Err> {
97    sender: OneshotSender<Result<T, Err>>,
98    future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
99}
100
101impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
104    }
105}
106
107trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
108    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
109}
110
111/// @dev Implements `WrappedAnyRequest` for `AnyRequestFuture`.
112///
113/// - `poll_inner` is similar to `Future` polling but intentionally consumes the Future<Output=T>
114///   and return Future<Output=()>
115/// - This design avoids storing `Future<Output = T>` directly, as its type may not be known at
116///   compile time.
117/// - Instead, the result (`Result<T, Err>`) is sent via the `sender` channel, which enforces type
118///   safety.
119impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
120where
121    T: fmt::Debug + Send + 'static,
122    Err: fmt::Debug + Send + 'static,
123{
124    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
125        match self.future.poll_unpin(cx) {
126            Poll::Ready(result) => {
127                let _ = self.sender.send(result);
128                Poll::Ready(())
129            }
130            Poll::Pending => Poll::Pending,
131        }
132    }
133}
134
135/// Request variants that are executed by the provider
136enum ProviderRequest<Err, N: Network = AnyNetwork> {
137    Account(AccountFuture<Err>),
138    Storage(StorageFuture<Err>),
139    BlockHash(BlockHashFuture<Err>),
140    FullBlock(FullBlockFuture<Err, N>),
141    Transaction(TransactionFuture<Err, N>),
142    AnyRequest(Box<dyn WrappedAnyRequest>),
143}
144
145/// The Request type the Backend listens for
146#[derive(Debug)]
147enum BackendRequest<N: Network = AnyNetwork> {
148    /// Fetch the account info
149    Basic(Address, AccountInfoSender),
150    /// Fetch a storage slot
151    Storage(Address, U256, StorageSender),
152    /// Fetch a block hash
153    BlockHash(u64, BlockHashSender),
154    /// Fetch an entire block with transactions
155    FullBlock(BlockId, FullBlockSender<N>),
156    /// Fetch a transaction
157    Transaction(B256, TransactionSender<N>),
158    /// Sets the pinned block to fetch data from
159    SetPinnedBlock(BlockId),
160
161    /// Update Address data
162    UpdateAddress(AddressData),
163    /// Update Storage data
164    UpdateStorage(StorageData),
165    /// Update Block Hashes
166    UpdateBlockHash(BlockHashData),
167    /// Any other request
168    AnyRequest(Box<dyn WrappedAnyRequest>),
169}
170
171/// Handles an internal provider and listens for requests.
172///
173/// This handler will remain active as long as it is reachable (request channel still open) and
174/// requests are in progress.
175#[must_use = "futures do nothing unless polled"]
176pub struct BackendHandler<N: Network = AnyNetwork> {
177    provider: DynProvider<N>,
178    /// Stores all the data.
179    db: BlockchainDb,
180    /// Requests currently in progress
181    pending_requests: Vec<ProviderRequest<eyre::Report, N>>,
182    /// Listeners that wait for a `get_account` related response
183    account_requests: HashMap<Address, Vec<AccountInfoSender>>,
184    /// Listeners that wait for a `get_storage_at` response
185    storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
186    /// Listeners that wait for a `get_block` response
187    block_requests: HashMap<u64, Vec<BlockHashSender>>,
188    /// Incoming commands.
189    incoming: UnboundedReceiver<BackendRequest<N>>,
190    /// unprocessed queued requests
191    queued_requests: VecDeque<BackendRequest<N>>,
192    /// The block to fetch data from.
193    // This is an `Option` so that we can have less code churn in the functions below
194    block_id: Option<BlockId>,
195    /// The mode for fetching account data
196    account_fetch_mode: Arc<AtomicU8>,
197}
198
199impl<N: Network> BackendHandler<N> {
200    fn new(
201        provider: DynProvider<N>,
202        db: BlockchainDb,
203        rx: UnboundedReceiver<BackendRequest<N>>,
204        block_id: Option<BlockId>,
205    ) -> Self {
206        Self {
207            provider,
208            db,
209            pending_requests: Default::default(),
210            account_requests: Default::default(),
211            storage_requests: Default::default(),
212            block_requests: Default::default(),
213            queued_requests: Default::default(),
214            incoming: rx,
215            block_id,
216            account_fetch_mode: Arc::new(AtomicU8::new(ACCOUNT_FETCH_UNCHECKED)),
217        }
218    }
219
220    /// handle the request in queue in the future.
221    ///
222    /// We always check:
223    ///  1. if the requested value is already stored in the cache, then answer the sender
224    ///  2. otherwise, fetch it via the provider but check if a request for that value is already in
225    ///     progress (e.g. another Sender just requested the same account)
226    fn on_request(&mut self, req: BackendRequest<N>) {
227        match req {
228            BackendRequest::Basic(addr, sender) => {
229                trace!(target: "backendhandler", "received request basic address={:?}", addr);
230                let acc = self.db.accounts().read().get(&addr).cloned();
231                if let Some(basic) = acc {
232                    let _ = sender.send(Ok(basic));
233                } else {
234                    self.request_account(addr, sender);
235                }
236            }
237            BackendRequest::BlockHash(number, sender) => {
238                let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
239                if let Some(hash) = hash {
240                    let _ = sender.send(Ok(hash));
241                } else {
242                    self.request_hash(number, sender);
243                }
244            }
245            BackendRequest::FullBlock(number, sender) => {
246                self.request_full_block(number, sender);
247            }
248            BackendRequest::Transaction(tx, sender) => {
249                self.request_transaction(tx, sender);
250            }
251            BackendRequest::Storage(addr, idx, sender) => {
252                // account is already stored in the cache
253                let value =
254                    self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
255                if let Some(value) = value {
256                    let _ = sender.send(Ok(value));
257                } else {
258                    // account present but not storage -> fetch storage
259                    self.request_account_storage(addr, idx, sender);
260                }
261            }
262            BackendRequest::SetPinnedBlock(block_id) => {
263                self.block_id = Some(block_id);
264            }
265            BackendRequest::UpdateAddress(address_data) => {
266                for (address, data) in address_data {
267                    self.db.accounts().write().insert(address, data);
268                }
269            }
270            BackendRequest::UpdateStorage(storage_data) => {
271                for (address, data) in storage_data {
272                    self.db.storage().write().insert(address, data);
273                }
274            }
275            BackendRequest::UpdateBlockHash(block_hash_data) => {
276                for (block, hash) in block_hash_data {
277                    self.db.block_hashes().write().insert(block, hash);
278                }
279            }
280            BackendRequest::AnyRequest(fut) => {
281                self.pending_requests.push(ProviderRequest::AnyRequest(fut));
282            }
283        }
284    }
285
286    /// process a request for account's storage
287    fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
288        match self.storage_requests.entry((address, idx)) {
289            Entry::Occupied(mut entry) => {
290                entry.get_mut().push(listener);
291            }
292            Entry::Vacant(entry) => {
293                trace!(target: "backendhandler", %address, %idx, "preparing storage request");
294                entry.insert(vec![listener]);
295                let provider = self.provider.clone();
296                let block_id = self.block_id.unwrap_or_default();
297                let fut = Box::pin(async move {
298                    let storage = provider
299                        .get_storage_at(address, idx)
300                        .block_id(block_id)
301                        .await
302                        .map_err(Into::into);
303                    (storage, address, idx)
304                });
305                self.pending_requests.push(ProviderRequest::Storage(fut));
306            }
307        }
308    }
309
310    /// returns the future that fetches the account data
311    fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report, N> {
312        trace!(target: "backendhandler", "preparing account request, address={:?}", address);
313
314        let provider = self.provider.clone();
315        let block_id = self.block_id.unwrap_or_default();
316        let mode = Arc::clone(&self.account_fetch_mode);
317        let fut = async move {
318            // depending on the tracked mode we can dispatch requests.
319            let initial_mode = mode.load(Ordering::Relaxed);
320            match initial_mode {
321                ACCOUNT_FETCH_UNCHECKED => {
322                    // single request for accountinfo object
323                    let acc_info_fut =
324                        provider.get_account_info(address).block_id(block_id).into_future();
325
326                    // tri request for account info
327                    let balance_fut =
328                        provider.get_balance(address).block_id(block_id).into_future();
329                    let nonce_fut =
330                        provider.get_transaction_count(address).block_id(block_id).into_future();
331                    let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
332                    let triple_fut = futures::future::try_join3(balance_fut, nonce_fut, code_fut);
333                    pin_mut!(acc_info_fut, triple_fut);
334
335                    select! {
336                        acc_info = &mut acc_info_fut => {
337                            match acc_info {
338                                Ok(info) => {
339                                 trace!(target: "backendhandler", "endpoint supports eth_getAccountInfo");
340                                    mode.store(ACCOUNT_FETCH_SUPPORTS_ACC_INFO, Ordering::Relaxed);
341                                    Ok((info.balance, info.nonce, info.code))
342                                }
343                                Err(err) => {
344                                    trace!(target: "backendhandler", ?err, "failed initial eth_getAccountInfo call");
345                                    mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
346                                    Ok(triple_fut.await?)
347                                }
348                            }
349                        }
350                        triple = &mut triple_fut => {
351                            match triple {
352                                Ok((balance, nonce, code)) => {
353                                    mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
354                                    Ok((balance, nonce, code))
355                                }
356                                Err(err) => Err(err.into())
357                            }
358                        }
359                    }
360                }
361
362                ACCOUNT_FETCH_SUPPORTS_ACC_INFO => {
363                    let mut res = provider
364                        .get_account_info(address)
365                        .block_id(block_id)
366                        .into_future()
367                        .await
368                        .map(|info| (info.balance, info.nonce, info.code));
369
370                    // it's possible that the configured endpoint load balances requests to multiple
371                    // instances and not all support that endpoint so we should reset here
372                    if res.is_err() {
373                        mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
374
375                        let balance_fut =
376                            provider.get_balance(address).block_id(block_id).into_future();
377                        let nonce_fut = provider
378                            .get_transaction_count(address)
379                            .block_id(block_id)
380                            .into_future();
381                        let code_fut =
382                            provider.get_code_at(address).block_id(block_id).into_future();
383                        res = futures::future::try_join3(balance_fut, nonce_fut, code_fut).await;
384                    }
385
386                    Ok(res?)
387                }
388
389                ACCOUNT_FETCH_SEPARATE_REQUESTS => {
390                    let balance_fut =
391                        provider.get_balance(address).block_id(block_id).into_future();
392                    let nonce_fut =
393                        provider.get_transaction_count(address).block_id(block_id).into_future();
394                    let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
395
396                    Ok(futures::future::try_join3(balance_fut, nonce_fut, code_fut).await?)
397                }
398
399                _ => unreachable!("Invalid account fetch mode"),
400            }
401        };
402
403        ProviderRequest::Account(Box::pin(async move {
404            let result = fut.await;
405            (result, address)
406        }))
407    }
408
409    /// process a request for an account
410    fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
411        match self.account_requests.entry(address) {
412            Entry::Occupied(mut entry) => {
413                entry.get_mut().push(listener);
414            }
415            Entry::Vacant(entry) => {
416                entry.insert(vec![listener]);
417                self.pending_requests.push(self.get_account_req(address));
418            }
419        }
420    }
421
422    /// process a request for an entire block
423    fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender<N>) {
424        let provider = self.provider.clone();
425        let fut = Box::pin(async move {
426            let block = provider
427                .get_block(number)
428                .full()
429                .await
430                .wrap_err(format!("could not fetch block {number:?}"));
431            (sender, block, number)
432        });
433
434        self.pending_requests.push(ProviderRequest::FullBlock(fut));
435    }
436
437    /// process a request for a transactions
438    fn request_transaction(&mut self, tx: B256, sender: TransactionSender<N>) {
439        let provider = self.provider.clone();
440        let fut = Box::pin(async move {
441            let block = provider
442                .get_transaction_by_hash(tx)
443                .await
444                .wrap_err_with(|| format!("could not get transaction {tx}"))
445                .and_then(|maybe| {
446                    maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
447                });
448            (sender, block, tx)
449        });
450
451        self.pending_requests.push(ProviderRequest::Transaction(fut));
452    }
453
454    /// process a request for a block hash
455    fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
456        match self.block_requests.entry(number) {
457            Entry::Occupied(mut entry) => {
458                entry.get_mut().push(listener);
459            }
460            Entry::Vacant(entry) => {
461                trace!(target: "backendhandler", number, "preparing block hash request");
462                entry.insert(vec![listener]);
463                let provider = self.provider.clone();
464                let fut = Box::pin(async move {
465                    let block = provider
466                        .get_block_by_number(number.into())
467                        .hashes()
468                        .await
469                        .wrap_err("failed to get block");
470
471                    let block_hash = match block {
472                        Ok(Some(block)) => Ok(block.header().hash()),
473                        Ok(None) => {
474                            warn!(target: "backendhandler", ?number, "block not found");
475                            // if no block was returned then the block does not exist, in which case
476                            // we return empty hash
477                            Ok(KECCAK_EMPTY)
478                        }
479                        Err(err) => {
480                            error!(target: "backendhandler", %err, ?number, "failed to get block");
481                            Err(err)
482                        }
483                    };
484                    (block_hash, number)
485                });
486                self.pending_requests.push(ProviderRequest::BlockHash(fut));
487            }
488        }
489    }
490}
491
492impl<N: Network> Future for BackendHandler<N> {
493    type Output = ();
494
495    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
496        let pin = self.get_mut();
497        loop {
498            // Drain queued requests first.
499            while let Some(req) = pin.queued_requests.pop_front() {
500                pin.on_request(req)
501            }
502
503            // receive new requests to delegate to the underlying provider
504            loop {
505                match Pin::new(&mut pin.incoming).poll_next(cx) {
506                    Poll::Ready(Some(req)) => {
507                        pin.queued_requests.push_back(req);
508                    }
509                    Poll::Ready(None) => {
510                        trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
511                        return Poll::Ready(());
512                    }
513                    Poll::Pending => break,
514                }
515            }
516
517            // poll all requests in progress
518            for n in (0..pin.pending_requests.len()).rev() {
519                let mut request = pin.pending_requests.swap_remove(n);
520                match &mut request {
521                    ProviderRequest::Account(fut) => {
522                        if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
523                            // get the response
524                            let (balance, nonce, code) = match resp {
525                                Ok(res) => res,
526                                Err(err) => {
527                                    let err = Arc::new(err);
528                                    if let Some(listeners) = pin.account_requests.remove(&addr) {
529                                        listeners.into_iter().for_each(|l| {
530                                            let _ = l.send(Err(DatabaseError::GetAccount(
531                                                addr,
532                                                Arc::clone(&err),
533                                            )));
534                                        })
535                                    }
536                                    continue;
537                                }
538                            };
539
540                            // convert it to revm-style types
541                            let (code, code_hash) = if !code.is_empty() {
542                                (code.clone(), keccak256(&code))
543                            } else {
544                                (Bytes::default(), KECCAK_EMPTY)
545                            };
546
547                            // update the cache
548                            let acc = AccountInfo {
549                                nonce,
550                                balance,
551                                code: Some(Bytecode::new_raw(code)),
552                                code_hash,
553                                account_id: None,
554                            };
555                            pin.db.accounts().write().insert(addr, acc.clone());
556
557                            // notify all listeners
558                            if let Some(listeners) = pin.account_requests.remove(&addr) {
559                                listeners.into_iter().for_each(|l| {
560                                    let _ = l.send(Ok(acc.clone()));
561                                })
562                            }
563                            continue;
564                        }
565                    }
566                    ProviderRequest::Storage(fut) => {
567                        if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
568                            let value = match resp {
569                                Ok(value) => value,
570                                Err(err) => {
571                                    // notify all listeners
572                                    let err = Arc::new(err);
573                                    if let Some(listeners) =
574                                        pin.storage_requests.remove(&(addr, idx))
575                                    {
576                                        listeners.into_iter().for_each(|l| {
577                                            let _ = l.send(Err(DatabaseError::GetStorage(
578                                                addr,
579                                                idx,
580                                                Arc::clone(&err),
581                                            )));
582                                        })
583                                    }
584                                    continue;
585                                }
586                            };
587
588                            // update the cache
589                            pin.db.storage().write().entry(addr).or_default().insert(idx, value);
590
591                            // notify all listeners
592                            if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
593                                listeners.into_iter().for_each(|l| {
594                                    let _ = l.send(Ok(value));
595                                })
596                            }
597                            continue;
598                        }
599                    }
600                    ProviderRequest::BlockHash(fut) => {
601                        if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
602                            let value = match block_hash {
603                                Ok(value) => value,
604                                Err(err) => {
605                                    let err = Arc::new(err);
606                                    // notify all listeners
607                                    if let Some(listeners) = pin.block_requests.remove(&number) {
608                                        listeners.into_iter().for_each(|l| {
609                                            let _ = l.send(Err(DatabaseError::GetBlockHash(
610                                                number,
611                                                Arc::clone(&err),
612                                            )));
613                                        })
614                                    }
615                                    continue;
616                                }
617                            };
618
619                            // update the cache
620                            pin.db.block_hashes().write().insert(U256::from(number), value);
621
622                            // notify all listeners
623                            if let Some(listeners) = pin.block_requests.remove(&number) {
624                                listeners.into_iter().for_each(|l| {
625                                    let _ = l.send(Ok(value));
626                                })
627                            }
628                            continue;
629                        }
630                    }
631                    ProviderRequest::FullBlock(fut) => {
632                        if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
633                            let msg = match resp {
634                                Ok(Some(block)) => Ok(block),
635                                Ok(None) => Err(DatabaseError::BlockNotFound(number)),
636                                Err(err) => {
637                                    let err = Arc::new(err);
638                                    Err(DatabaseError::GetFullBlock(number, err))
639                                }
640                            };
641                            let _ = sender.send(msg);
642                            continue;
643                        }
644                    }
645                    ProviderRequest::Transaction(fut) => {
646                        if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
647                            let msg = match tx {
648                                Ok(tx) => Ok(tx),
649                                Err(err) => {
650                                    let err = Arc::new(err);
651                                    Err(DatabaseError::GetTransaction(tx_hash, err))
652                                }
653                            };
654                            let _ = sender.send(msg);
655                            continue;
656                        }
657                    }
658                    ProviderRequest::AnyRequest(fut) => {
659                        if fut.poll_inner(cx).is_ready() {
660                            continue;
661                        }
662                    }
663                }
664                // not ready, insert and poll again
665                pin.pending_requests.push(request);
666            }
667
668            // If no new requests have been queued, break to
669            // be polled again later.
670            if pin.queued_requests.is_empty() {
671                return Poll::Pending;
672            }
673        }
674    }
675}
676
677/// Mode for the `SharedBackend` how to block in the non-async [`DatabaseRef`] when interacting with
678/// [`BackendHandler`].
679#[derive(Default, Clone, Debug, PartialEq)]
680pub enum BlockingMode {
681    /// This mode use `tokio::task::block_in_place()` to block in place.
682    ///
683    /// This should be used when blocking on the call site is disallowed.
684    #[default]
685    BlockInPlace,
686    /// The mode blocks the current task
687    ///
688    /// This can be used if blocking on the call site is allowed, e.g. on a tokio blocking task.
689    Block,
690}
691
692impl BlockingMode {
693    /// run process logic with the blocking mode
694    pub fn run<F, R>(&self, f: F) -> R
695    where
696        F: FnOnce() -> R,
697    {
698        match self {
699            Self::BlockInPlace => tokio::task::block_in_place(f),
700            Self::Block => f(),
701        }
702    }
703}
704
705/// A cloneable backend type that shares access to the backend data with all its clones.
706///
707/// This backend type is connected to the `BackendHandler` via a mpsc unbounded channel. The
708/// `BackendHandler` is spawned on a tokio task and listens for incoming commands on the receiver
709/// half of the channel. A `SharedBackend` holds a sender for that channel, which is `Clone`, so
710/// there can be multiple `SharedBackend`s communicating with the same `BackendHandler`, hence this
711/// `Backend` type is thread safe.
712///
713/// All `Backend` trait functions are delegated as a `BackendRequest` via the channel to the
714/// `BackendHandler`. All `BackendRequest` variants include a sender half of an additional channel
715/// that is used by the `BackendHandler` to send the result of an executed `BackendRequest` back to
716/// `SharedBackend`.
717///
718/// The `BackendHandler` holds a `Provider` to look up missing accounts or storage slots
719/// from remote (e.g. infura). It detects duplicate requests from multiple `SharedBackend`s and
720/// bundles them together, so that always only one provider request is executed. For example, there
721/// are two `SharedBackend`s, `A` and `B`, both request the basic account info of account
722/// `0xasd9sa7d...` at the same time. After the `BackendHandler` receives the request from `A`, it
723/// sends a new provider request to the provider's endpoint, then it reads the identical request
724/// from `B` and simply adds it as an additional listener for the request already in progress,
725/// instead of sending another one. So that after the provider returns the response all listeners
726/// (`A` and `B`) get notified.
727// **Note**: the implementation makes use of [tokio::task::block_in_place()] when interacting with
728// the underlying [BackendHandler] which runs on a separate spawned tokio task.
729// [tokio::task::block_in_place()]
730// > Runs the provided blocking function on the current thread without blocking the executor.
731// This prevents issues (hangs) we ran into were the [SharedBackend] itself is called from a spawned
732// task.
733#[derive(Clone, Debug)]
734pub struct SharedBackend<N: Network = AnyNetwork> {
735    /// channel used for sending commands related to database operations
736    backend: UnboundedSender<BackendRequest<N>>,
737    /// Ensures that the underlying cache gets flushed once the last `SharedBackend` is dropped.
738    ///
739    /// There is only one instance of the type, so as soon as the last `SharedBackend` is deleted,
740    /// `FlushJsonBlockCacheDB` is also deleted and the cache is flushed.
741    cache: Arc<FlushJsonBlockCacheDB>,
742
743    /// The mode for the `SharedBackend` to block in place or not
744    blocking_mode: BlockingMode,
745}
746
747impl<N: Network> SharedBackend<N> {
748    /// _Spawns_ a new `BackendHandler` on a `tokio::task` that listens for requests from any
749    /// `SharedBackend`. Missing values get inserted in the `db`.
750    ///
751    /// The spawned `BackendHandler` finishes once the last `SharedBackend` connected to it is
752    /// dropped.
753    pub async fn spawn_backend<P: Provider<N> + 'static>(
754        provider: P,
755        db: BlockchainDb,
756        pin_block: Option<BlockId>,
757    ) -> Self {
758        let (shared, handler) = Self::new(provider, db, pin_block);
759        // spawn the provider handler to a task
760        trace!(target: "backendhandler", "spawning Backendhandler task");
761        tokio::spawn(handler);
762        shared
763    }
764
765    /// Same as `Self::spawn_backend` but spawns the `BackendHandler` on a separate `std::thread` in
766    /// its own `tokio::Runtime`
767    pub fn spawn_backend_thread<P: Provider<N> + 'static>(
768        provider: P,
769        db: BlockchainDb,
770        pin_block: Option<BlockId>,
771    ) -> Self {
772        let (shared, handler) = Self::new(provider, db, pin_block);
773
774        // spawn a light-weight thread with a thread-local async runtime just for
775        // sending and receiving data from the remote client
776        std::thread::Builder::new()
777            .name("fork-backend".into())
778            .spawn(move || {
779                let rt = tokio::runtime::Builder::new_current_thread()
780                    .enable_all()
781                    .build()
782                    .expect("failed to build tokio runtime");
783
784                rt.block_on(handler);
785            })
786            .expect("failed to spawn thread");
787        trace!(target: "backendhandler", "spawned Backendhandler thread");
788
789        shared
790    }
791
792    /// Returns a new `SharedBackend` and the `BackendHandler`
793    pub fn new<P: Provider<N> + 'static>(
794        provider: P,
795        db: BlockchainDb,
796        pin_block: Option<BlockId>,
797    ) -> (Self, BackendHandler<N>) {
798        let (backend, backend_rx) = unbounded();
799        let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
800        let handler = BackendHandler::new(provider.erased(), db, backend_rx, pin_block);
801        (Self { backend, cache, blocking_mode: Default::default() }, handler)
802    }
803
804    /// Returns a new `SharedBackend` and the `BackendHandler` with a specific blocking mode
805    pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
806        Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
807    }
808
809    /// Updates the pinned block to fetch data from
810    pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
811        let req = BackendRequest::SetPinnedBlock(block.into());
812        self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
813    }
814
815    /// Returns the full block for the given block identifier
816    pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<N::BlockResponse> {
817        self.blocking_mode.run(|| {
818            let (sender, rx) = oneshot_channel();
819            let req = BackendRequest::FullBlock(block.into(), sender);
820            self.backend.unbounded_send(req)?;
821            rx.recv()?
822        })
823    }
824
825    /// Returns the transaction for the hash
826    pub fn get_transaction(&self, tx: B256) -> DatabaseResult<N::TransactionResponse> {
827        self.blocking_mode.run(|| {
828            let (sender, rx) = oneshot_channel();
829            let req = BackendRequest::Transaction(tx, sender);
830            self.backend.unbounded_send(req)?;
831            rx.recv()?
832        })
833    }
834
835    fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
836        self.blocking_mode.run(|| {
837            let (sender, rx) = oneshot_channel();
838            let req = BackendRequest::Basic(address, sender);
839            self.backend.unbounded_send(req)?;
840            rx.recv()?.map(Some)
841        })
842    }
843
844    fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
845        self.blocking_mode.run(|| {
846            let (sender, rx) = oneshot_channel();
847            let req = BackendRequest::Storage(address, index, sender);
848            self.backend.unbounded_send(req)?;
849            rx.recv()?
850        })
851    }
852
853    fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
854        self.blocking_mode.run(|| {
855            let (sender, rx) = oneshot_channel();
856            let req = BackendRequest::BlockHash(number, sender);
857            self.backend.unbounded_send(req)?;
858            rx.recv()?
859        })
860    }
861
862    /// Inserts or updates data for multiple addresses
863    pub fn insert_or_update_address(&self, address_data: AddressData) {
864        let req = BackendRequest::UpdateAddress(address_data);
865        let err = self.backend.unbounded_send(req);
866        match err {
867            Ok(_) => (),
868            Err(e) => {
869                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
870            }
871        }
872    }
873
874    /// Inserts or updates data for multiple storage slots
875    pub fn insert_or_update_storage(&self, storage_data: StorageData) {
876        let req = BackendRequest::UpdateStorage(storage_data);
877        let err = self.backend.unbounded_send(req);
878        match err {
879            Ok(_) => (),
880            Err(e) => {
881                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
882            }
883        }
884    }
885
886    /// Inserts or updates data for multiple block hashes
887    pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
888        let req = BackendRequest::UpdateBlockHash(block_hash_data);
889        let err = self.backend.unbounded_send(req);
890        match err {
891            Ok(_) => (),
892            Err(e) => {
893                error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
894            }
895        }
896    }
897
898    /// Returns any arbitrary request on the provider
899    pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
900    where
901        F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
902        T: fmt::Debug + Send + 'static,
903    {
904        self.blocking_mode.run(|| {
905            let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
906            let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
907                sender,
908                future: Box::pin(fut),
909            }));
910            self.backend.unbounded_send(req)?;
911            rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
912        })
913    }
914
915    /// Flushes the DB to disk if caching is enabled
916    pub fn flush_cache(&self) {
917        self.cache.0.flush();
918    }
919
920    /// Flushes the DB to a specific file
921    pub fn flush_cache_to(&self, cache_path: &Path) {
922        self.cache.0.flush_to(cache_path);
923    }
924
925    /// Returns the DB
926    pub fn data(&self) -> Arc<MemDb> {
927        self.cache.0.db().clone()
928    }
929
930    /// Returns the DB accounts
931    pub fn accounts(&self) -> AddressData {
932        self.cache.0.db().accounts.read().clone()
933    }
934
935    /// Returns the DB accounts length
936    pub fn accounts_len(&self) -> usize {
937        self.cache.0.db().accounts.read().len()
938    }
939
940    /// Returns the DB storage
941    pub fn storage(&self) -> StorageData {
942        self.cache.0.db().storage.read().clone()
943    }
944
945    /// Returns the DB storage length
946    pub fn storage_len(&self) -> usize {
947        self.cache.0.db().storage.read().len()
948    }
949
950    /// Returns the DB block_hashes
951    pub fn block_hashes(&self) -> BlockHashData {
952        self.cache.0.db().block_hashes.read().clone()
953    }
954
955    /// Returns the DB block_hashes length
956    pub fn block_hashes_len(&self) -> usize {
957        self.cache.0.db().block_hashes.read().len()
958    }
959}
960
961impl<N: Network> DatabaseRef for SharedBackend<N> {
962    type Error = DatabaseError;
963
964    fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
965        trace!(target: "sharedbackend", %address, "request basic");
966        self.do_get_basic(address).inspect_err(|err| {
967            error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
968            if err.is_possibly_non_archive_node_error() {
969                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
970            }
971        })
972    }
973
974    fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
975        Err(DatabaseError::MissingCode(hash))
976    }
977
978    fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
979        trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
980        self.do_get_storage(address, index).inspect_err(|err| {
981            error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
982            if err.is_possibly_non_archive_node_error() {
983                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
984            }
985        })
986    }
987
988    fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
989        trace!(target: "sharedbackend", "request block hash for number {:?}", number);
990        self.do_get_block_hash(number).inspect_err(|err| {
991            error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
992            if err.is_possibly_non_archive_node_error() {
993                error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
994            }
995        })
996    }
997}
998
999#[cfg(test)]
1000mod tests {
1001    use super::*;
1002    use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
1003    use alloy_consensus::BlockHeader;
1004    use alloy_provider::ProviderBuilder;
1005    use alloy_rpc_client::ClientBuilder;
1006    use serde::Deserialize;
1007    use std::{fs, path::PathBuf};
1008    use tiny_http::{Response, Server};
1009
1010    pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
1011        ProviderBuilder::new()
1012            .network::<AnyNetwork>()
1013            .connect_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
1014    }
1015
1016    const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
1017
1018    #[tokio::test(flavor = "multi_thread")]
1019    async fn test_builder() {
1020        let Some(endpoint) = ENDPOINT else { return };
1021        let provider = get_http_provider(endpoint);
1022
1023        let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
1024        let meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
1025
1026        assert_eq!(meta.block_env.number, U256::from(any_rpc_block.header.number()));
1027    }
1028
1029    #[tokio::test(flavor = "multi_thread")]
1030    async fn shared_backend() {
1031        let Some(endpoint) = ENDPOINT else { return };
1032
1033        let provider = get_http_provider(endpoint);
1034        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1035
1036        let db = BlockchainDb::new(meta, None);
1037        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1038
1039        // some rng contract from etherscan
1040        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1041
1042        let idx = U256::from(0u64);
1043        let value = backend.storage_ref(address, idx).unwrap();
1044        let account = backend.basic_ref(address).unwrap().unwrap();
1045
1046        let mem_acc = db.accounts().read().get(&address).unwrap().clone();
1047        assert_eq!(account.balance, mem_acc.balance);
1048        assert_eq!(account.nonce, mem_acc.nonce);
1049        let slots = db.storage().read().get(&address).unwrap().clone();
1050        assert_eq!(slots.len(), 1);
1051        assert_eq!(slots.get(&idx).copied().unwrap(), value);
1052
1053        let num = 10u64;
1054        let hash = backend.block_hash_ref(num).unwrap();
1055        let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
1056        assert_eq!(hash, mem_hash);
1057
1058        let max_slots = 5;
1059        let handle = std::thread::spawn(move || {
1060            for i in 1..max_slots {
1061                let idx = U256::from(i);
1062                let _ = backend.storage_ref(address, idx);
1063            }
1064        });
1065        handle.join().unwrap();
1066        let slots = db.storage().read().get(&address).unwrap().clone();
1067        assert_eq!(slots.len() as u64, max_slots);
1068    }
1069
1070    #[test]
1071    fn can_read_cache() {
1072        let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
1073        let json = JsonBlockCacheDB::load(cache_path).unwrap();
1074        assert!(!json.db().accounts.read().is_empty());
1075    }
1076
1077    #[tokio::test(flavor = "multi_thread")]
1078    async fn can_modify_address() {
1079        let Some(endpoint) = ENDPOINT else { return };
1080
1081        let provider = get_http_provider(endpoint);
1082        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1083
1084        let db = BlockchainDb::new(meta, None);
1085        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1086
1087        // some rng contract from etherscan
1088        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1089
1090        let new_acc = AccountInfo {
1091            nonce: 1000u64,
1092            balance: U256::from(2000),
1093            code: None,
1094            code_hash: KECCAK_EMPTY,
1095            account_id: None,
1096        };
1097        let mut account_data = AddressData::default();
1098        account_data.insert(address, new_acc.clone());
1099
1100        backend.insert_or_update_address(account_data);
1101
1102        let max_slots = 5;
1103        let handle = std::thread::spawn(move || {
1104            for i in 1..max_slots {
1105                let idx = U256::from(i);
1106                let result_address = backend.basic_ref(address).unwrap();
1107                match result_address {
1108                    Some(acc) => {
1109                        assert_eq!(
1110                            acc.nonce, new_acc.nonce,
1111                            "The nonce was not changed in instance of index {idx}"
1112                        );
1113                        assert_eq!(
1114                            acc.balance, new_acc.balance,
1115                            "The balance was not changed in instance of index {idx}"
1116                        );
1117
1118                        // comparing with db
1119                        let db_address = {
1120                            let accounts = db.accounts().read();
1121                            accounts.get(&address).unwrap().clone()
1122                        };
1123
1124                        assert_eq!(
1125                            db_address.nonce, new_acc.nonce,
1126                            "The nonce was not changed in instance of index {idx}"
1127                        );
1128                        assert_eq!(
1129                            db_address.balance, new_acc.balance,
1130                            "The balance was not changed in instance of index {idx}"
1131                        );
1132                    }
1133                    None => panic!("Account not found"),
1134                }
1135            }
1136        });
1137        handle.join().unwrap();
1138    }
1139
1140    #[tokio::test(flavor = "multi_thread")]
1141    async fn can_modify_storage() {
1142        let Some(endpoint) = ENDPOINT else { return };
1143
1144        let provider = get_http_provider(endpoint);
1145        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1146
1147        let db = BlockchainDb::new(meta, None);
1148        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1149
1150        // some rng contract from etherscan
1151        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1152
1153        let mut storage_data = StorageData::default();
1154        let mut storage_info = StorageInfo::default();
1155        storage_info.insert(U256::from(20), U256::from(10));
1156        storage_info.insert(U256::from(30), U256::from(15));
1157        storage_info.insert(U256::from(40), U256::from(20));
1158
1159        storage_data.insert(address, storage_info);
1160
1161        backend.insert_or_update_storage(storage_data.clone());
1162
1163        let max_slots = 5;
1164        let handle = std::thread::spawn(move || {
1165            for _ in 1..max_slots {
1166                for (address, info) in &storage_data {
1167                    for (index, value) in info {
1168                        let result_storage = backend.do_get_storage(*address, *index);
1169                        match result_storage {
1170                            Ok(stg_db) => {
1171                                assert_eq!(
1172                                    stg_db, *value,
1173                                    "Storage in slot number {index} in address {address} do not have the same value"
1174                                );
1175
1176                                let db_result = {
1177                                    let storage = db.storage().read();
1178                                    let address_storage = storage.get(address).unwrap();
1179                                    *address_storage.get(index).unwrap()
1180                                };
1181
1182                                assert_eq!(
1183                                    stg_db, db_result,
1184                                    "Storage in slot number {index} in address {address} do not have the same value"
1185                                )
1186                            }
1187
1188                            Err(err) => {
1189                                panic!("There was a database error: {err}")
1190                            }
1191                        }
1192                    }
1193                }
1194            }
1195        });
1196        handle.join().unwrap();
1197    }
1198
1199    #[tokio::test(flavor = "multi_thread")]
1200    async fn can_modify_block_hashes() {
1201        let Some(endpoint) = ENDPOINT else { return };
1202
1203        let provider = get_http_provider(endpoint);
1204        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1205
1206        let db = BlockchainDb::new(meta, None);
1207        let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1208
1209        // some rng contract from etherscan
1210        // let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1211
1212        let mut block_hash_data = BlockHashData::default();
1213        block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1214        block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1215        block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1216        block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1217        block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1218
1219        backend.insert_or_update_block_hashes(block_hash_data.clone());
1220
1221        let max_slots: u64 = 5;
1222        let handle = std::thread::spawn(move || {
1223            for i in 1..max_slots {
1224                let key = U256::from(i);
1225                let result_hash = backend.do_get_block_hash(i);
1226                match result_hash {
1227                    Ok(hash) => {
1228                        assert_eq!(
1229                            hash,
1230                            *block_hash_data.get(&key).unwrap(),
1231                            "The hash in block {key} did not match"
1232                        );
1233
1234                        let db_result = {
1235                            let hashes = db.block_hashes().read();
1236                            *hashes.get(&key).unwrap()
1237                        };
1238
1239                        assert_eq!(hash, db_result, "The hash in block {key} did not match");
1240                    }
1241                    Err(err) => panic!("Hash not found, error: {err}"),
1242                }
1243            }
1244        });
1245        handle.join().unwrap();
1246    }
1247
1248    #[tokio::test(flavor = "multi_thread")]
1249    async fn can_modify_storage_with_cache() {
1250        let Some(endpoint) = ENDPOINT else { return };
1251
1252        let provider = get_http_provider(endpoint);
1253        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1254
1255        // create a temporary file
1256        fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1257
1258        let cache_path =
1259            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1260
1261        let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1262        let backend =
1263            SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1264
1265        // some rng contract from etherscan
1266        let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1267
1268        let mut storage_data = StorageData::default();
1269        let mut storage_info = StorageInfo::default();
1270        storage_info.insert(U256::from(1), U256::from(10));
1271        storage_info.insert(U256::from(2), U256::from(15));
1272        storage_info.insert(U256::from(3), U256::from(20));
1273        storage_info.insert(U256::from(4), U256::from(20));
1274        storage_info.insert(U256::from(5), U256::from(15));
1275        storage_info.insert(U256::from(6), U256::from(10));
1276
1277        let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1278        address_data.code = None;
1279
1280        storage_data.insert(address, storage_info);
1281
1282        backend.insert_or_update_storage(storage_data.clone());
1283
1284        let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1285        // nullify the code
1286        new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1287
1288        let mut account_data = AddressData::default();
1289        account_data.insert(address, new_acc.clone());
1290
1291        backend.insert_or_update_address(account_data);
1292
1293        let backend_clone = backend.clone();
1294
1295        let max_slots = 5;
1296        let handle = std::thread::spawn(move || {
1297            for _ in 1..max_slots {
1298                for (address, info) in &storage_data {
1299                    for (index, value) in info {
1300                        let result_storage = backend.do_get_storage(*address, *index);
1301                        match result_storage {
1302                            Ok(stg_db) => {
1303                                assert_eq!(
1304                                    stg_db, *value,
1305                                    "Storage in slot number {index} in address {address} doesn't have the same value"
1306                                );
1307
1308                                let db_result = {
1309                                    let storage = db.storage().read();
1310                                    let address_storage = storage.get(address).unwrap();
1311                                    *address_storage.get(index).unwrap()
1312                                };
1313
1314                                assert_eq!(
1315                                    stg_db, db_result,
1316                                    "Storage in slot number {index} in address {address} doesn't have the same value"
1317                                );
1318                            }
1319
1320                            Err(err) => {
1321                                panic!("There was a database error: {err}")
1322                            }
1323                        }
1324                    }
1325                }
1326            }
1327
1328            backend_clone.flush_cache();
1329        });
1330        handle.join().unwrap();
1331
1332        // read json and confirm the changes to the data
1333
1334        let cache_path =
1335            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1336
1337        let json_db = BlockchainDb::new(meta, Some(cache_path));
1338
1339        let mut storage_data = StorageData::default();
1340        let mut storage_info = StorageInfo::default();
1341        storage_info.insert(U256::from(1), U256::from(10));
1342        storage_info.insert(U256::from(2), U256::from(15));
1343        storage_info.insert(U256::from(3), U256::from(20));
1344        storage_info.insert(U256::from(4), U256::from(20));
1345        storage_info.insert(U256::from(5), U256::from(15));
1346        storage_info.insert(U256::from(6), U256::from(10));
1347
1348        storage_data.insert(address, storage_info);
1349
1350        // redo the checks with the data extracted from the json file
1351        let max_slots = 5;
1352        let handle = std::thread::spawn(move || {
1353            for _ in 1..max_slots {
1354                for (address, info) in &storage_data {
1355                    for (index, value) in info {
1356                        let result_storage = {
1357                            let storage = json_db.storage().read();
1358                            let address_storage = storage.get(address).unwrap().clone();
1359                            *address_storage.get(index).unwrap()
1360                        };
1361
1362                        assert_eq!(
1363                            result_storage, *value,
1364                            "Storage in slot number {index} in address {address} doesn't have the same value"
1365                        );
1366                    }
1367                }
1368            }
1369        });
1370
1371        handle.join().unwrap();
1372
1373        // erase the temporary file
1374        fs::remove_file("test-data/storage-tmp.json").unwrap();
1375    }
1376
1377    #[tokio::test(flavor = "multi_thread")]
1378    async fn shared_backend_any_request() {
1379        let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1380        let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1381        let endpoint = format!("http://{}", server.server_addr());
1382
1383        // Spin an in-memory server that responds to "foo_callCustomMethod" rpc call.
1384        let expected_bytes_innner = expected_response_bytes.clone();
1385        let server_handle = std::thread::spawn(move || {
1386            #[derive(Debug, Deserialize)]
1387            struct Request {
1388                method: String,
1389            }
1390            let mut request = server.recv().unwrap();
1391            let rpc_request: Request =
1392                serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1393
1394            match rpc_request.method.as_str() {
1395                "foo_callCustomMethod" => request
1396                    .respond(Response::from_string(format!(
1397                        r#"{{"result": "{}"}}"#,
1398                        alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1399                    )))
1400                    .unwrap(),
1401                _ => request
1402                    .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1403                    .unwrap(),
1404            };
1405        });
1406
1407        let provider = get_http_provider(&endpoint);
1408        let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1409
1410        let db = BlockchainDb::new(meta, None);
1411        let provider_inner = provider.clone();
1412        let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1413
1414        let actual_response_bytes = backend
1415            .do_any_request(async move {
1416                let bytes: alloy_primitives::Bytes =
1417                    provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1418                Ok(bytes)
1419            })
1420            .expect("failed performing any request");
1421
1422        assert_eq!(actual_response_bytes, expected_response_bytes);
1423
1424        server_handle.join().unwrap();
1425    }
1426}