1use crate::{
4 cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5 error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9 network::{primitives::HeaderResponse, AnyNetwork, BlockResponse},
10 DynProvider, Network, Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16 pin_mut,
17 stream::Stream,
18 task::{Context, Poll},
19 Future, FutureExt,
20};
21use revm::{
22 database::DatabaseRef,
23 primitives::{
24 map::{hash_map::Entry, AddressHashMap, HashMap},
25 KECCAK_EMPTY,
26 },
27 state::{AccountInfo, Bytecode},
28};
29use std::{
30 collections::VecDeque,
31 fmt,
32 future::IntoFuture,
33 path::Path,
34 pin::Pin,
35 sync::{
36 atomic::{AtomicU8, Ordering},
37 mpsc::{channel as oneshot_channel, Sender as OneshotSender},
38 Arc,
39 },
40};
41use tokio::select;
42
43pub const NON_ARCHIVE_NODE_WARNING: &str = "\
45It looks like you're trying to fork from an older block with a non-archive node which is not \
46supported. Please try to change your RPC url to an archive node if the issue persists.";
47
48type AccountFuture<Err> =
51 Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
52type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
53type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
54type FullBlockFuture<Err, N = AnyNetwork> = Pin<
55 Box<
56 dyn Future<
57 Output = (
58 FullBlockSender<N>,
59 Result<Option<<N as Network>::BlockResponse>, Err>,
60 BlockId,
61 ),
62 > + Send,
63 >,
64>;
65type TransactionFuture<Err, N = AnyNetwork> = Pin<
66 Box<
67 dyn Future<
68 Output = (
69 TransactionSender<N>,
70 Result<<N as Network>::TransactionResponse, Err>,
71 B256,
72 ),
73 > + Send,
74 >,
75>;
76
77type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
78type StorageSender = OneshotSender<DatabaseResult<U256>>;
79type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
80type FullBlockSender<N = AnyNetwork> = OneshotSender<DatabaseResult<<N as Network>::BlockResponse>>;
81type TransactionSender<N = AnyNetwork> =
82 OneshotSender<DatabaseResult<<N as Network>::TransactionResponse>>;
83
84type AddressData = AddressHashMap<AccountInfo>;
85type StorageData = AddressHashMap<StorageInfo>;
86type BlockHashData = HashMap<U256, B256>;
87
88const ACCOUNT_FETCH_UNCHECKED: u8 = 0;
90const ACCOUNT_FETCH_SUPPORTS_ACC_INFO: u8 = 1;
93const ACCOUNT_FETCH_SEPARATE_REQUESTS: u8 = 2;
95
96struct AnyRequestFuture<T, Err> {
97 sender: OneshotSender<Result<T, Err>>,
98 future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
99}
100
101impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
104 }
105}
106
107trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
108 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
109}
110
111impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
120where
121 T: fmt::Debug + Send + 'static,
122 Err: fmt::Debug + Send + 'static,
123{
124 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
125 match self.future.poll_unpin(cx) {
126 Poll::Ready(result) => {
127 let _ = self.sender.send(result);
128 Poll::Ready(())
129 }
130 Poll::Pending => Poll::Pending,
131 }
132 }
133}
134
135enum ProviderRequest<Err, N: Network = AnyNetwork> {
137 Account(AccountFuture<Err>),
138 Storage(StorageFuture<Err>),
139 BlockHash(BlockHashFuture<Err>),
140 FullBlock(FullBlockFuture<Err, N>),
141 Transaction(TransactionFuture<Err, N>),
142 AnyRequest(Box<dyn WrappedAnyRequest>),
143}
144
145#[derive(Debug)]
147enum BackendRequest<N: Network = AnyNetwork> {
148 Basic(Address, AccountInfoSender),
150 Storage(Address, U256, StorageSender),
152 BlockHash(u64, BlockHashSender),
154 FullBlock(BlockId, FullBlockSender<N>),
156 Transaction(B256, TransactionSender<N>),
158 SetPinnedBlock(BlockId),
160
161 UpdateAddress(AddressData),
163 UpdateStorage(StorageData),
165 UpdateBlockHash(BlockHashData),
167 AnyRequest(Box<dyn WrappedAnyRequest>),
169}
170
171#[must_use = "futures do nothing unless polled"]
176pub struct BackendHandler<N: Network = AnyNetwork> {
177 provider: DynProvider<N>,
178 db: BlockchainDb,
180 pending_requests: Vec<ProviderRequest<eyre::Report, N>>,
182 account_requests: HashMap<Address, Vec<AccountInfoSender>>,
184 storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
186 block_requests: HashMap<u64, Vec<BlockHashSender>>,
188 incoming: UnboundedReceiver<BackendRequest<N>>,
190 queued_requests: VecDeque<BackendRequest<N>>,
192 block_id: Option<BlockId>,
195 account_fetch_mode: Arc<AtomicU8>,
197}
198
199impl<N: Network> BackendHandler<N> {
200 fn new(
201 provider: DynProvider<N>,
202 db: BlockchainDb,
203 rx: UnboundedReceiver<BackendRequest<N>>,
204 block_id: Option<BlockId>,
205 ) -> Self {
206 Self {
207 provider,
208 db,
209 pending_requests: Default::default(),
210 account_requests: Default::default(),
211 storage_requests: Default::default(),
212 block_requests: Default::default(),
213 queued_requests: Default::default(),
214 incoming: rx,
215 block_id,
216 account_fetch_mode: Arc::new(AtomicU8::new(ACCOUNT_FETCH_UNCHECKED)),
217 }
218 }
219
220 fn on_request(&mut self, req: BackendRequest<N>) {
227 match req {
228 BackendRequest::Basic(addr, sender) => {
229 trace!(target: "backendhandler", "received request basic address={:?}", addr);
230 let acc = self.db.accounts().read().get(&addr).cloned();
231 if let Some(basic) = acc {
232 let _ = sender.send(Ok(basic));
233 } else {
234 self.request_account(addr, sender);
235 }
236 }
237 BackendRequest::BlockHash(number, sender) => {
238 let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
239 if let Some(hash) = hash {
240 let _ = sender.send(Ok(hash));
241 } else {
242 self.request_hash(number, sender);
243 }
244 }
245 BackendRequest::FullBlock(number, sender) => {
246 self.request_full_block(number, sender);
247 }
248 BackendRequest::Transaction(tx, sender) => {
249 self.request_transaction(tx, sender);
250 }
251 BackendRequest::Storage(addr, idx, sender) => {
252 let value =
254 self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
255 if let Some(value) = value {
256 let _ = sender.send(Ok(value));
257 } else {
258 self.request_account_storage(addr, idx, sender);
260 }
261 }
262 BackendRequest::SetPinnedBlock(block_id) => {
263 self.block_id = Some(block_id);
264 }
265 BackendRequest::UpdateAddress(address_data) => {
266 for (address, data) in address_data {
267 self.db.accounts().write().insert(address, data);
268 }
269 }
270 BackendRequest::UpdateStorage(storage_data) => {
271 for (address, data) in storage_data {
272 self.db.storage().write().insert(address, data);
273 }
274 }
275 BackendRequest::UpdateBlockHash(block_hash_data) => {
276 for (block, hash) in block_hash_data {
277 self.db.block_hashes().write().insert(block, hash);
278 }
279 }
280 BackendRequest::AnyRequest(fut) => {
281 self.pending_requests.push(ProviderRequest::AnyRequest(fut));
282 }
283 }
284 }
285
286 fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
288 match self.storage_requests.entry((address, idx)) {
289 Entry::Occupied(mut entry) => {
290 entry.get_mut().push(listener);
291 }
292 Entry::Vacant(entry) => {
293 trace!(target: "backendhandler", %address, %idx, "preparing storage request");
294 entry.insert(vec![listener]);
295 let provider = self.provider.clone();
296 let block_id = self.block_id.unwrap_or_default();
297 let fut = Box::pin(async move {
298 let storage = provider
299 .get_storage_at(address, idx)
300 .block_id(block_id)
301 .await
302 .map_err(Into::into);
303 (storage, address, idx)
304 });
305 self.pending_requests.push(ProviderRequest::Storage(fut));
306 }
307 }
308 }
309
310 fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report, N> {
312 trace!(target: "backendhandler", "preparing account request, address={:?}", address);
313
314 let provider = self.provider.clone();
315 let block_id = self.block_id.unwrap_or_default();
316 let mode = Arc::clone(&self.account_fetch_mode);
317 let fut = async move {
318 let initial_mode = mode.load(Ordering::Relaxed);
320 match initial_mode {
321 ACCOUNT_FETCH_UNCHECKED => {
322 let acc_info_fut =
324 provider.get_account_info(address).block_id(block_id).into_future();
325
326 let balance_fut =
328 provider.get_balance(address).block_id(block_id).into_future();
329 let nonce_fut =
330 provider.get_transaction_count(address).block_id(block_id).into_future();
331 let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
332 let triple_fut = futures::future::try_join3(balance_fut, nonce_fut, code_fut);
333 pin_mut!(acc_info_fut, triple_fut);
334
335 select! {
336 acc_info = &mut acc_info_fut => {
337 match acc_info {
338 Ok(info) => {
339 trace!(target: "backendhandler", "endpoint supports eth_getAccountInfo");
340 mode.store(ACCOUNT_FETCH_SUPPORTS_ACC_INFO, Ordering::Relaxed);
341 Ok((info.balance, info.nonce, info.code))
342 }
343 Err(err) => {
344 trace!(target: "backendhandler", ?err, "failed initial eth_getAccountInfo call");
345 mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
346 Ok(triple_fut.await?)
347 }
348 }
349 }
350 triple = &mut triple_fut => {
351 match triple {
352 Ok((balance, nonce, code)) => {
353 mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
354 Ok((balance, nonce, code))
355 }
356 Err(err) => Err(err.into())
357 }
358 }
359 }
360 }
361
362 ACCOUNT_FETCH_SUPPORTS_ACC_INFO => {
363 let mut res = provider
364 .get_account_info(address)
365 .block_id(block_id)
366 .into_future()
367 .await
368 .map(|info| (info.balance, info.nonce, info.code));
369
370 if res.is_err() {
373 mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
374
375 let balance_fut =
376 provider.get_balance(address).block_id(block_id).into_future();
377 let nonce_fut = provider
378 .get_transaction_count(address)
379 .block_id(block_id)
380 .into_future();
381 let code_fut =
382 provider.get_code_at(address).block_id(block_id).into_future();
383 res = futures::future::try_join3(balance_fut, nonce_fut, code_fut).await;
384 }
385
386 Ok(res?)
387 }
388
389 ACCOUNT_FETCH_SEPARATE_REQUESTS => {
390 let balance_fut =
391 provider.get_balance(address).block_id(block_id).into_future();
392 let nonce_fut =
393 provider.get_transaction_count(address).block_id(block_id).into_future();
394 let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
395
396 Ok(futures::future::try_join3(balance_fut, nonce_fut, code_fut).await?)
397 }
398
399 _ => unreachable!("Invalid account fetch mode"),
400 }
401 };
402
403 ProviderRequest::Account(Box::pin(async move {
404 let result = fut.await;
405 (result, address)
406 }))
407 }
408
409 fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
411 match self.account_requests.entry(address) {
412 Entry::Occupied(mut entry) => {
413 entry.get_mut().push(listener);
414 }
415 Entry::Vacant(entry) => {
416 entry.insert(vec![listener]);
417 self.pending_requests.push(self.get_account_req(address));
418 }
419 }
420 }
421
422 fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender<N>) {
424 let provider = self.provider.clone();
425 let fut = Box::pin(async move {
426 let block = provider
427 .get_block(number)
428 .full()
429 .await
430 .wrap_err(format!("could not fetch block {number:?}"));
431 (sender, block, number)
432 });
433
434 self.pending_requests.push(ProviderRequest::FullBlock(fut));
435 }
436
437 fn request_transaction(&mut self, tx: B256, sender: TransactionSender<N>) {
439 let provider = self.provider.clone();
440 let fut = Box::pin(async move {
441 let block = provider
442 .get_transaction_by_hash(tx)
443 .await
444 .wrap_err_with(|| format!("could not get transaction {tx}"))
445 .and_then(|maybe| {
446 maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
447 });
448 (sender, block, tx)
449 });
450
451 self.pending_requests.push(ProviderRequest::Transaction(fut));
452 }
453
454 fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
456 match self.block_requests.entry(number) {
457 Entry::Occupied(mut entry) => {
458 entry.get_mut().push(listener);
459 }
460 Entry::Vacant(entry) => {
461 trace!(target: "backendhandler", number, "preparing block hash request");
462 entry.insert(vec![listener]);
463 let provider = self.provider.clone();
464 let fut = Box::pin(async move {
465 let block = provider
466 .get_block_by_number(number.into())
467 .hashes()
468 .await
469 .wrap_err("failed to get block");
470
471 let block_hash = match block {
472 Ok(Some(block)) => Ok(block.header().hash()),
473 Ok(None) => {
474 warn!(target: "backendhandler", ?number, "block not found");
475 Ok(KECCAK_EMPTY)
478 }
479 Err(err) => {
480 error!(target: "backendhandler", %err, ?number, "failed to get block");
481 Err(err)
482 }
483 };
484 (block_hash, number)
485 });
486 self.pending_requests.push(ProviderRequest::BlockHash(fut));
487 }
488 }
489 }
490}
491
492impl<N: Network> Future for BackendHandler<N> {
493 type Output = ();
494
495 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
496 let pin = self.get_mut();
497 loop {
498 while let Some(req) = pin.queued_requests.pop_front() {
500 pin.on_request(req)
501 }
502
503 loop {
505 match Pin::new(&mut pin.incoming).poll_next(cx) {
506 Poll::Ready(Some(req)) => {
507 pin.queued_requests.push_back(req);
508 }
509 Poll::Ready(None) => {
510 trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
511 return Poll::Ready(());
512 }
513 Poll::Pending => break,
514 }
515 }
516
517 for n in (0..pin.pending_requests.len()).rev() {
519 let mut request = pin.pending_requests.swap_remove(n);
520 match &mut request {
521 ProviderRequest::Account(fut) => {
522 if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
523 let (balance, nonce, code) = match resp {
525 Ok(res) => res,
526 Err(err) => {
527 let err = Arc::new(err);
528 if let Some(listeners) = pin.account_requests.remove(&addr) {
529 listeners.into_iter().for_each(|l| {
530 let _ = l.send(Err(DatabaseError::GetAccount(
531 addr,
532 Arc::clone(&err),
533 )));
534 })
535 }
536 continue;
537 }
538 };
539
540 let (code, code_hash) = if !code.is_empty() {
542 (code.clone(), keccak256(&code))
543 } else {
544 (Bytes::default(), KECCAK_EMPTY)
545 };
546
547 let acc = AccountInfo {
549 nonce,
550 balance,
551 code: Some(Bytecode::new_raw(code)),
552 code_hash,
553 account_id: None,
554 };
555 pin.db.accounts().write().insert(addr, acc.clone());
556
557 if let Some(listeners) = pin.account_requests.remove(&addr) {
559 listeners.into_iter().for_each(|l| {
560 let _ = l.send(Ok(acc.clone()));
561 })
562 }
563 continue;
564 }
565 }
566 ProviderRequest::Storage(fut) => {
567 if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
568 let value = match resp {
569 Ok(value) => value,
570 Err(err) => {
571 let err = Arc::new(err);
573 if let Some(listeners) =
574 pin.storage_requests.remove(&(addr, idx))
575 {
576 listeners.into_iter().for_each(|l| {
577 let _ = l.send(Err(DatabaseError::GetStorage(
578 addr,
579 idx,
580 Arc::clone(&err),
581 )));
582 })
583 }
584 continue;
585 }
586 };
587
588 pin.db.storage().write().entry(addr).or_default().insert(idx, value);
590
591 if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
593 listeners.into_iter().for_each(|l| {
594 let _ = l.send(Ok(value));
595 })
596 }
597 continue;
598 }
599 }
600 ProviderRequest::BlockHash(fut) => {
601 if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
602 let value = match block_hash {
603 Ok(value) => value,
604 Err(err) => {
605 let err = Arc::new(err);
606 if let Some(listeners) = pin.block_requests.remove(&number) {
608 listeners.into_iter().for_each(|l| {
609 let _ = l.send(Err(DatabaseError::GetBlockHash(
610 number,
611 Arc::clone(&err),
612 )));
613 })
614 }
615 continue;
616 }
617 };
618
619 pin.db.block_hashes().write().insert(U256::from(number), value);
621
622 if let Some(listeners) = pin.block_requests.remove(&number) {
624 listeners.into_iter().for_each(|l| {
625 let _ = l.send(Ok(value));
626 })
627 }
628 continue;
629 }
630 }
631 ProviderRequest::FullBlock(fut) => {
632 if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
633 let msg = match resp {
634 Ok(Some(block)) => Ok(block),
635 Ok(None) => Err(DatabaseError::BlockNotFound(number)),
636 Err(err) => {
637 let err = Arc::new(err);
638 Err(DatabaseError::GetFullBlock(number, err))
639 }
640 };
641 let _ = sender.send(msg);
642 continue;
643 }
644 }
645 ProviderRequest::Transaction(fut) => {
646 if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
647 let msg = match tx {
648 Ok(tx) => Ok(tx),
649 Err(err) => {
650 let err = Arc::new(err);
651 Err(DatabaseError::GetTransaction(tx_hash, err))
652 }
653 };
654 let _ = sender.send(msg);
655 continue;
656 }
657 }
658 ProviderRequest::AnyRequest(fut) => {
659 if fut.poll_inner(cx).is_ready() {
660 continue;
661 }
662 }
663 }
664 pin.pending_requests.push(request);
666 }
667
668 if pin.queued_requests.is_empty() {
671 return Poll::Pending;
672 }
673 }
674 }
675}
676
677#[derive(Default, Clone, Debug, PartialEq)]
680pub enum BlockingMode {
681 #[default]
685 BlockInPlace,
686 Block,
690}
691
692impl BlockingMode {
693 pub fn run<F, R>(&self, f: F) -> R
695 where
696 F: FnOnce() -> R,
697 {
698 match self {
699 Self::BlockInPlace => tokio::task::block_in_place(f),
700 Self::Block => f(),
701 }
702 }
703}
704
705#[derive(Clone, Debug)]
734pub struct SharedBackend<N: Network = AnyNetwork> {
735 backend: UnboundedSender<BackendRequest<N>>,
737 cache: Arc<FlushJsonBlockCacheDB>,
742
743 blocking_mode: BlockingMode,
745}
746
747impl<N: Network> SharedBackend<N> {
748 pub async fn spawn_backend<P: Provider<N> + 'static>(
754 provider: P,
755 db: BlockchainDb,
756 pin_block: Option<BlockId>,
757 ) -> Self {
758 let (shared, handler) = Self::new(provider, db, pin_block);
759 trace!(target: "backendhandler", "spawning Backendhandler task");
761 tokio::spawn(handler);
762 shared
763 }
764
765 pub fn spawn_backend_thread<P: Provider<N> + 'static>(
768 provider: P,
769 db: BlockchainDb,
770 pin_block: Option<BlockId>,
771 ) -> Self {
772 let (shared, handler) = Self::new(provider, db, pin_block);
773
774 std::thread::Builder::new()
777 .name("fork-backend".into())
778 .spawn(move || {
779 let rt = tokio::runtime::Builder::new_current_thread()
780 .enable_all()
781 .build()
782 .expect("failed to build tokio runtime");
783
784 rt.block_on(handler);
785 })
786 .expect("failed to spawn thread");
787 trace!(target: "backendhandler", "spawned Backendhandler thread");
788
789 shared
790 }
791
792 pub fn new<P: Provider<N> + 'static>(
794 provider: P,
795 db: BlockchainDb,
796 pin_block: Option<BlockId>,
797 ) -> (Self, BackendHandler<N>) {
798 let (backend, backend_rx) = unbounded();
799 let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
800 let handler = BackendHandler::new(provider.erased(), db, backend_rx, pin_block);
801 (Self { backend, cache, blocking_mode: Default::default() }, handler)
802 }
803
804 pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
806 Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
807 }
808
809 pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
811 let req = BackendRequest::SetPinnedBlock(block.into());
812 self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
813 }
814
815 pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<N::BlockResponse> {
817 self.blocking_mode.run(|| {
818 let (sender, rx) = oneshot_channel();
819 let req = BackendRequest::FullBlock(block.into(), sender);
820 self.backend.unbounded_send(req)?;
821 rx.recv()?
822 })
823 }
824
825 pub fn get_transaction(&self, tx: B256) -> DatabaseResult<N::TransactionResponse> {
827 self.blocking_mode.run(|| {
828 let (sender, rx) = oneshot_channel();
829 let req = BackendRequest::Transaction(tx, sender);
830 self.backend.unbounded_send(req)?;
831 rx.recv()?
832 })
833 }
834
835 fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
836 self.blocking_mode.run(|| {
837 let (sender, rx) = oneshot_channel();
838 let req = BackendRequest::Basic(address, sender);
839 self.backend.unbounded_send(req)?;
840 rx.recv()?.map(Some)
841 })
842 }
843
844 fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
845 self.blocking_mode.run(|| {
846 let (sender, rx) = oneshot_channel();
847 let req = BackendRequest::Storage(address, index, sender);
848 self.backend.unbounded_send(req)?;
849 rx.recv()?
850 })
851 }
852
853 fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
854 self.blocking_mode.run(|| {
855 let (sender, rx) = oneshot_channel();
856 let req = BackendRequest::BlockHash(number, sender);
857 self.backend.unbounded_send(req)?;
858 rx.recv()?
859 })
860 }
861
862 pub fn insert_or_update_address(&self, address_data: AddressData) {
864 let req = BackendRequest::UpdateAddress(address_data);
865 let err = self.backend.unbounded_send(req);
866 match err {
867 Ok(_) => (),
868 Err(e) => {
869 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
870 }
871 }
872 }
873
874 pub fn insert_or_update_storage(&self, storage_data: StorageData) {
876 let req = BackendRequest::UpdateStorage(storage_data);
877 let err = self.backend.unbounded_send(req);
878 match err {
879 Ok(_) => (),
880 Err(e) => {
881 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
882 }
883 }
884 }
885
886 pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
888 let req = BackendRequest::UpdateBlockHash(block_hash_data);
889 let err = self.backend.unbounded_send(req);
890 match err {
891 Ok(_) => (),
892 Err(e) => {
893 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
894 }
895 }
896 }
897
898 pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
900 where
901 F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
902 T: fmt::Debug + Send + 'static,
903 {
904 self.blocking_mode.run(|| {
905 let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
906 let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
907 sender,
908 future: Box::pin(fut),
909 }));
910 self.backend.unbounded_send(req)?;
911 rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
912 })
913 }
914
915 pub fn flush_cache(&self) {
917 self.cache.0.flush();
918 }
919
920 pub fn flush_cache_to(&self, cache_path: &Path) {
922 self.cache.0.flush_to(cache_path);
923 }
924
925 pub fn data(&self) -> Arc<MemDb> {
927 self.cache.0.db().clone()
928 }
929
930 pub fn accounts(&self) -> AddressData {
932 self.cache.0.db().accounts.read().clone()
933 }
934
935 pub fn accounts_len(&self) -> usize {
937 self.cache.0.db().accounts.read().len()
938 }
939
940 pub fn storage(&self) -> StorageData {
942 self.cache.0.db().storage.read().clone()
943 }
944
945 pub fn storage_len(&self) -> usize {
947 self.cache.0.db().storage.read().len()
948 }
949
950 pub fn block_hashes(&self) -> BlockHashData {
952 self.cache.0.db().block_hashes.read().clone()
953 }
954
955 pub fn block_hashes_len(&self) -> usize {
957 self.cache.0.db().block_hashes.read().len()
958 }
959}
960
961impl<N: Network> DatabaseRef for SharedBackend<N> {
962 type Error = DatabaseError;
963
964 fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
965 trace!(target: "sharedbackend", %address, "request basic");
966 self.do_get_basic(address).inspect_err(|err| {
967 error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
968 if err.is_possibly_non_archive_node_error() {
969 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
970 }
971 })
972 }
973
974 fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
975 Err(DatabaseError::MissingCode(hash))
976 }
977
978 fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
979 trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
980 self.do_get_storage(address, index).inspect_err(|err| {
981 error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
982 if err.is_possibly_non_archive_node_error() {
983 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
984 }
985 })
986 }
987
988 fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
989 trace!(target: "sharedbackend", "request block hash for number {:?}", number);
990 self.do_get_block_hash(number).inspect_err(|err| {
991 error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
992 if err.is_possibly_non_archive_node_error() {
993 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
994 }
995 })
996 }
997}
998
999#[cfg(test)]
1000mod tests {
1001 use super::*;
1002 use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
1003 use alloy_consensus::BlockHeader;
1004 use alloy_provider::ProviderBuilder;
1005 use alloy_rpc_client::ClientBuilder;
1006 use serde::Deserialize;
1007 use std::{fs, path::PathBuf};
1008 use tiny_http::{Response, Server};
1009
1010 pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
1011 ProviderBuilder::new()
1012 .network::<AnyNetwork>()
1013 .connect_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
1014 }
1015
1016 const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
1017
1018 #[tokio::test(flavor = "multi_thread")]
1019 async fn test_builder() {
1020 let Some(endpoint) = ENDPOINT else { return };
1021 let provider = get_http_provider(endpoint);
1022
1023 let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
1024 let meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
1025
1026 assert_eq!(meta.block_env.number, U256::from(any_rpc_block.header.number()));
1027 }
1028
1029 #[tokio::test(flavor = "multi_thread")]
1030 async fn shared_backend() {
1031 let Some(endpoint) = ENDPOINT else { return };
1032
1033 let provider = get_http_provider(endpoint);
1034 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1035
1036 let db = BlockchainDb::new(meta, None);
1037 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1038
1039 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1041
1042 let idx = U256::from(0u64);
1043 let value = backend.storage_ref(address, idx).unwrap();
1044 let account = backend.basic_ref(address).unwrap().unwrap();
1045
1046 let mem_acc = db.accounts().read().get(&address).unwrap().clone();
1047 assert_eq!(account.balance, mem_acc.balance);
1048 assert_eq!(account.nonce, mem_acc.nonce);
1049 let slots = db.storage().read().get(&address).unwrap().clone();
1050 assert_eq!(slots.len(), 1);
1051 assert_eq!(slots.get(&idx).copied().unwrap(), value);
1052
1053 let num = 10u64;
1054 let hash = backend.block_hash_ref(num).unwrap();
1055 let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
1056 assert_eq!(hash, mem_hash);
1057
1058 let max_slots = 5;
1059 let handle = std::thread::spawn(move || {
1060 for i in 1..max_slots {
1061 let idx = U256::from(i);
1062 let _ = backend.storage_ref(address, idx);
1063 }
1064 });
1065 handle.join().unwrap();
1066 let slots = db.storage().read().get(&address).unwrap().clone();
1067 assert_eq!(slots.len() as u64, max_slots);
1068 }
1069
1070 #[test]
1071 fn can_read_cache() {
1072 let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
1073 let json = JsonBlockCacheDB::load(cache_path).unwrap();
1074 assert!(!json.db().accounts.read().is_empty());
1075 }
1076
1077 #[tokio::test(flavor = "multi_thread")]
1078 async fn can_modify_address() {
1079 let Some(endpoint) = ENDPOINT else { return };
1080
1081 let provider = get_http_provider(endpoint);
1082 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1083
1084 let db = BlockchainDb::new(meta, None);
1085 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1086
1087 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1089
1090 let new_acc = AccountInfo {
1091 nonce: 1000u64,
1092 balance: U256::from(2000),
1093 code: None,
1094 code_hash: KECCAK_EMPTY,
1095 account_id: None,
1096 };
1097 let mut account_data = AddressData::default();
1098 account_data.insert(address, new_acc.clone());
1099
1100 backend.insert_or_update_address(account_data);
1101
1102 let max_slots = 5;
1103 let handle = std::thread::spawn(move || {
1104 for i in 1..max_slots {
1105 let idx = U256::from(i);
1106 let result_address = backend.basic_ref(address).unwrap();
1107 match result_address {
1108 Some(acc) => {
1109 assert_eq!(
1110 acc.nonce, new_acc.nonce,
1111 "The nonce was not changed in instance of index {idx}"
1112 );
1113 assert_eq!(
1114 acc.balance, new_acc.balance,
1115 "The balance was not changed in instance of index {idx}"
1116 );
1117
1118 let db_address = {
1120 let accounts = db.accounts().read();
1121 accounts.get(&address).unwrap().clone()
1122 };
1123
1124 assert_eq!(
1125 db_address.nonce, new_acc.nonce,
1126 "The nonce was not changed in instance of index {idx}"
1127 );
1128 assert_eq!(
1129 db_address.balance, new_acc.balance,
1130 "The balance was not changed in instance of index {idx}"
1131 );
1132 }
1133 None => panic!("Account not found"),
1134 }
1135 }
1136 });
1137 handle.join().unwrap();
1138 }
1139
1140 #[tokio::test(flavor = "multi_thread")]
1141 async fn can_modify_storage() {
1142 let Some(endpoint) = ENDPOINT else { return };
1143
1144 let provider = get_http_provider(endpoint);
1145 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1146
1147 let db = BlockchainDb::new(meta, None);
1148 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1149
1150 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1152
1153 let mut storage_data = StorageData::default();
1154 let mut storage_info = StorageInfo::default();
1155 storage_info.insert(U256::from(20), U256::from(10));
1156 storage_info.insert(U256::from(30), U256::from(15));
1157 storage_info.insert(U256::from(40), U256::from(20));
1158
1159 storage_data.insert(address, storage_info);
1160
1161 backend.insert_or_update_storage(storage_data.clone());
1162
1163 let max_slots = 5;
1164 let handle = std::thread::spawn(move || {
1165 for _ in 1..max_slots {
1166 for (address, info) in &storage_data {
1167 for (index, value) in info {
1168 let result_storage = backend.do_get_storage(*address, *index);
1169 match result_storage {
1170 Ok(stg_db) => {
1171 assert_eq!(
1172 stg_db, *value,
1173 "Storage in slot number {index} in address {address} do not have the same value"
1174 );
1175
1176 let db_result = {
1177 let storage = db.storage().read();
1178 let address_storage = storage.get(address).unwrap();
1179 *address_storage.get(index).unwrap()
1180 };
1181
1182 assert_eq!(
1183 stg_db, db_result,
1184 "Storage in slot number {index} in address {address} do not have the same value"
1185 )
1186 }
1187
1188 Err(err) => {
1189 panic!("There was a database error: {err}")
1190 }
1191 }
1192 }
1193 }
1194 }
1195 });
1196 handle.join().unwrap();
1197 }
1198
1199 #[tokio::test(flavor = "multi_thread")]
1200 async fn can_modify_block_hashes() {
1201 let Some(endpoint) = ENDPOINT else { return };
1202
1203 let provider = get_http_provider(endpoint);
1204 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1205
1206 let db = BlockchainDb::new(meta, None);
1207 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1208
1209 let mut block_hash_data = BlockHashData::default();
1213 block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1214 block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1215 block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1216 block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1217 block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1218
1219 backend.insert_or_update_block_hashes(block_hash_data.clone());
1220
1221 let max_slots: u64 = 5;
1222 let handle = std::thread::spawn(move || {
1223 for i in 1..max_slots {
1224 let key = U256::from(i);
1225 let result_hash = backend.do_get_block_hash(i);
1226 match result_hash {
1227 Ok(hash) => {
1228 assert_eq!(
1229 hash,
1230 *block_hash_data.get(&key).unwrap(),
1231 "The hash in block {key} did not match"
1232 );
1233
1234 let db_result = {
1235 let hashes = db.block_hashes().read();
1236 *hashes.get(&key).unwrap()
1237 };
1238
1239 assert_eq!(hash, db_result, "The hash in block {key} did not match");
1240 }
1241 Err(err) => panic!("Hash not found, error: {err}"),
1242 }
1243 }
1244 });
1245 handle.join().unwrap();
1246 }
1247
1248 #[tokio::test(flavor = "multi_thread")]
1249 async fn can_modify_storage_with_cache() {
1250 let Some(endpoint) = ENDPOINT else { return };
1251
1252 let provider = get_http_provider(endpoint);
1253 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1254
1255 fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1257
1258 let cache_path =
1259 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1260
1261 let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1262 let backend =
1263 SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1264
1265 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1267
1268 let mut storage_data = StorageData::default();
1269 let mut storage_info = StorageInfo::default();
1270 storage_info.insert(U256::from(1), U256::from(10));
1271 storage_info.insert(U256::from(2), U256::from(15));
1272 storage_info.insert(U256::from(3), U256::from(20));
1273 storage_info.insert(U256::from(4), U256::from(20));
1274 storage_info.insert(U256::from(5), U256::from(15));
1275 storage_info.insert(U256::from(6), U256::from(10));
1276
1277 let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1278 address_data.code = None;
1279
1280 storage_data.insert(address, storage_info);
1281
1282 backend.insert_or_update_storage(storage_data.clone());
1283
1284 let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1285 new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1287
1288 let mut account_data = AddressData::default();
1289 account_data.insert(address, new_acc.clone());
1290
1291 backend.insert_or_update_address(account_data);
1292
1293 let backend_clone = backend.clone();
1294
1295 let max_slots = 5;
1296 let handle = std::thread::spawn(move || {
1297 for _ in 1..max_slots {
1298 for (address, info) in &storage_data {
1299 for (index, value) in info {
1300 let result_storage = backend.do_get_storage(*address, *index);
1301 match result_storage {
1302 Ok(stg_db) => {
1303 assert_eq!(
1304 stg_db, *value,
1305 "Storage in slot number {index} in address {address} doesn't have the same value"
1306 );
1307
1308 let db_result = {
1309 let storage = db.storage().read();
1310 let address_storage = storage.get(address).unwrap();
1311 *address_storage.get(index).unwrap()
1312 };
1313
1314 assert_eq!(
1315 stg_db, db_result,
1316 "Storage in slot number {index} in address {address} doesn't have the same value"
1317 );
1318 }
1319
1320 Err(err) => {
1321 panic!("There was a database error: {err}")
1322 }
1323 }
1324 }
1325 }
1326 }
1327
1328 backend_clone.flush_cache();
1329 });
1330 handle.join().unwrap();
1331
1332 let cache_path =
1335 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1336
1337 let json_db = BlockchainDb::new(meta, Some(cache_path));
1338
1339 let mut storage_data = StorageData::default();
1340 let mut storage_info = StorageInfo::default();
1341 storage_info.insert(U256::from(1), U256::from(10));
1342 storage_info.insert(U256::from(2), U256::from(15));
1343 storage_info.insert(U256::from(3), U256::from(20));
1344 storage_info.insert(U256::from(4), U256::from(20));
1345 storage_info.insert(U256::from(5), U256::from(15));
1346 storage_info.insert(U256::from(6), U256::from(10));
1347
1348 storage_data.insert(address, storage_info);
1349
1350 let max_slots = 5;
1352 let handle = std::thread::spawn(move || {
1353 for _ in 1..max_slots {
1354 for (address, info) in &storage_data {
1355 for (index, value) in info {
1356 let result_storage = {
1357 let storage = json_db.storage().read();
1358 let address_storage = storage.get(address).unwrap().clone();
1359 *address_storage.get(index).unwrap()
1360 };
1361
1362 assert_eq!(
1363 result_storage, *value,
1364 "Storage in slot number {index} in address {address} doesn't have the same value"
1365 );
1366 }
1367 }
1368 }
1369 });
1370
1371 handle.join().unwrap();
1372
1373 fs::remove_file("test-data/storage-tmp.json").unwrap();
1375 }
1376
1377 #[tokio::test(flavor = "multi_thread")]
1378 async fn shared_backend_any_request() {
1379 let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1380 let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1381 let endpoint = format!("http://{}", server.server_addr());
1382
1383 let expected_bytes_innner = expected_response_bytes.clone();
1385 let server_handle = std::thread::spawn(move || {
1386 #[derive(Debug, Deserialize)]
1387 struct Request {
1388 method: String,
1389 }
1390 let mut request = server.recv().unwrap();
1391 let rpc_request: Request =
1392 serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1393
1394 match rpc_request.method.as_str() {
1395 "foo_callCustomMethod" => request
1396 .respond(Response::from_string(format!(
1397 r#"{{"result": "{}"}}"#,
1398 alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1399 )))
1400 .unwrap(),
1401 _ => request
1402 .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1403 .unwrap(),
1404 };
1405 });
1406
1407 let provider = get_http_provider(&endpoint);
1408 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1409
1410 let db = BlockchainDb::new(meta, None);
1411 let provider_inner = provider.clone();
1412 let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1413
1414 let actual_response_bytes = backend
1415 .do_any_request(async move {
1416 let bytes: alloy_primitives::Bytes =
1417 provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1418 Ok(bytes)
1419 })
1420 .expect("failed performing any request");
1421
1422 assert_eq!(actual_response_bytes, expected_response_bytes);
1423
1424 server_handle.join().unwrap();
1425 }
1426}