1use crate::{
4 cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5 error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9 network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction},
10 Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16 pin_mut,
17 stream::Stream,
18 task::{Context, Poll},
19 Future, FutureExt,
20};
21use revm::{
22 database::DatabaseRef,
23 primitives::{
24 map::{hash_map::Entry, AddressHashMap, HashMap},
25 KECCAK_EMPTY,
26 },
27 state::{AccountInfo, Bytecode},
28};
29use std::{
30 collections::VecDeque,
31 fmt,
32 future::IntoFuture,
33 path::Path,
34 pin::Pin,
35 sync::{
36 atomic::{AtomicU8, Ordering},
37 mpsc::{channel as oneshot_channel, Sender as OneshotSender},
38 Arc,
39 },
40};
41use tokio::select;
42
43pub const NON_ARCHIVE_NODE_WARNING: &str = "\
45It looks like you're trying to fork from an older block with a non-archive node which is not \
46supported. Please try to change your RPC url to an archive node if the issue persists.";
47
48type AccountFuture<Err> =
51 Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
52type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
53type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
54type FullBlockFuture<Err> = Pin<
55 Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
56>;
57type TransactionFuture<Err> =
58 Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
59
60type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
61type StorageSender = OneshotSender<DatabaseResult<U256>>;
62type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
63type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
64type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
65
66type AddressData = AddressHashMap<AccountInfo>;
67type StorageData = AddressHashMap<StorageInfo>;
68type BlockHashData = HashMap<U256, B256>;
69
70const ACCOUNT_FETCH_UNCHECKED: u8 = 0;
72const ACCOUNT_FETCH_SUPPORTS_ACC_INFO: u8 = 1;
75const ACCOUNT_FETCH_SEPARATE_REQUESTS: u8 = 2;
77
78struct AnyRequestFuture<T, Err> {
79 sender: OneshotSender<Result<T, Err>>,
80 future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
81}
82
83impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
86 }
87}
88
89trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
90 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
91}
92
93impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
102where
103 T: fmt::Debug + Send + 'static,
104 Err: fmt::Debug + Send + 'static,
105{
106 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
107 match self.future.poll_unpin(cx) {
108 Poll::Ready(result) => {
109 let _ = self.sender.send(result);
110 Poll::Ready(())
111 }
112 Poll::Pending => Poll::Pending,
113 }
114 }
115}
116
117enum ProviderRequest<Err> {
119 Account(AccountFuture<Err>),
120 Storage(StorageFuture<Err>),
121 BlockHash(BlockHashFuture<Err>),
122 FullBlock(FullBlockFuture<Err>),
123 Transaction(TransactionFuture<Err>),
124 AnyRequest(Box<dyn WrappedAnyRequest>),
125}
126
127#[derive(Debug)]
129enum BackendRequest {
130 Basic(Address, AccountInfoSender),
132 Storage(Address, U256, StorageSender),
134 BlockHash(u64, BlockHashSender),
136 FullBlock(BlockId, FullBlockSender),
138 Transaction(B256, TransactionSender),
140 SetPinnedBlock(BlockId),
142
143 UpdateAddress(AddressData),
145 UpdateStorage(StorageData),
147 UpdateBlockHash(BlockHashData),
149 AnyRequest(Box<dyn WrappedAnyRequest>),
151}
152
153#[must_use = "futures do nothing unless polled"]
158pub struct BackendHandler<P> {
159 provider: P,
160 db: BlockchainDb,
162 pending_requests: Vec<ProviderRequest<eyre::Report>>,
164 account_requests: HashMap<Address, Vec<AccountInfoSender>>,
166 storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
168 block_requests: HashMap<u64, Vec<BlockHashSender>>,
170 incoming: UnboundedReceiver<BackendRequest>,
172 queued_requests: VecDeque<BackendRequest>,
174 block_id: Option<BlockId>,
177 account_fetch_mode: Arc<AtomicU8>,
179}
180
181impl<P> BackendHandler<P>
182where
183 P: Provider<AnyNetwork> + Clone + Unpin + 'static,
184{
185 fn new(
186 provider: P,
187 db: BlockchainDb,
188 rx: UnboundedReceiver<BackendRequest>,
189 block_id: Option<BlockId>,
190 ) -> Self {
191 Self {
192 provider,
193 db,
194 pending_requests: Default::default(),
195 account_requests: Default::default(),
196 storage_requests: Default::default(),
197 block_requests: Default::default(),
198 queued_requests: Default::default(),
199 incoming: rx,
200 block_id,
201 account_fetch_mode: Arc::new(AtomicU8::new(ACCOUNT_FETCH_UNCHECKED)),
202 }
203 }
204
205 fn on_request(&mut self, req: BackendRequest) {
212 match req {
213 BackendRequest::Basic(addr, sender) => {
214 trace!(target: "backendhandler", "received request basic address={:?}", addr);
215 let acc = self.db.accounts().read().get(&addr).cloned();
216 if let Some(basic) = acc {
217 let _ = sender.send(Ok(basic));
218 } else {
219 self.request_account(addr, sender);
220 }
221 }
222 BackendRequest::BlockHash(number, sender) => {
223 let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
224 if let Some(hash) = hash {
225 let _ = sender.send(Ok(hash));
226 } else {
227 self.request_hash(number, sender);
228 }
229 }
230 BackendRequest::FullBlock(number, sender) => {
231 self.request_full_block(number, sender);
232 }
233 BackendRequest::Transaction(tx, sender) => {
234 self.request_transaction(tx, sender);
235 }
236 BackendRequest::Storage(addr, idx, sender) => {
237 let value =
239 self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
240 if let Some(value) = value {
241 let _ = sender.send(Ok(value));
242 } else {
243 self.request_account_storage(addr, idx, sender);
245 }
246 }
247 BackendRequest::SetPinnedBlock(block_id) => {
248 self.block_id = Some(block_id);
249 }
250 BackendRequest::UpdateAddress(address_data) => {
251 for (address, data) in address_data {
252 self.db.accounts().write().insert(address, data);
253 }
254 }
255 BackendRequest::UpdateStorage(storage_data) => {
256 for (address, data) in storage_data {
257 self.db.storage().write().insert(address, data);
258 }
259 }
260 BackendRequest::UpdateBlockHash(block_hash_data) => {
261 for (block, hash) in block_hash_data {
262 self.db.block_hashes().write().insert(block, hash);
263 }
264 }
265 BackendRequest::AnyRequest(fut) => {
266 self.pending_requests.push(ProviderRequest::AnyRequest(fut));
267 }
268 }
269 }
270
271 fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
273 match self.storage_requests.entry((address, idx)) {
274 Entry::Occupied(mut entry) => {
275 entry.get_mut().push(listener);
276 }
277 Entry::Vacant(entry) => {
278 trace!(target: "backendhandler", %address, %idx, "preparing storage request");
279 entry.insert(vec![listener]);
280 let provider = self.provider.clone();
281 let block_id = self.block_id.unwrap_or_default();
282 let fut = Box::pin(async move {
283 let storage = provider
284 .get_storage_at(address, idx)
285 .block_id(block_id)
286 .await
287 .map_err(Into::into);
288 (storage, address, idx)
289 });
290 self.pending_requests.push(ProviderRequest::Storage(fut));
291 }
292 }
293 }
294
295 fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
297 trace!(target: "backendhandler", "preparing account request, address={:?}", address);
298
299 let provider = self.provider.clone();
300 let block_id = self.block_id.unwrap_or_default();
301 let mode = Arc::clone(&self.account_fetch_mode);
302 let fut = async move {
303 let initial_mode = mode.load(Ordering::Relaxed);
305 match initial_mode {
306 ACCOUNT_FETCH_UNCHECKED => {
307 let acc_info_fut =
309 provider.get_account_info(address).block_id(block_id).into_future();
310
311 let balance_fut =
313 provider.get_balance(address).block_id(block_id).into_future();
314 let nonce_fut =
315 provider.get_transaction_count(address).block_id(block_id).into_future();
316 let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
317 let triple_fut = futures::future::try_join3(balance_fut, nonce_fut, code_fut);
318 pin_mut!(acc_info_fut, triple_fut);
319
320 select! {
321 acc_info = &mut acc_info_fut => {
322 match acc_info {
323 Ok(info) => {
324 trace!(target: "backendhandler", "endpoint supports eth_getAccountInfo");
325 mode.store(ACCOUNT_FETCH_SUPPORTS_ACC_INFO, Ordering::Relaxed);
326 Ok((info.balance, info.nonce, info.code))
327 }
328 Err(err) => {
329 trace!(target: "backendhandler", ?err, "failed initial eth_getAccountInfo call");
330 mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
331 Ok(triple_fut.await?)
332 }
333 }
334 }
335 triple = &mut triple_fut => {
336 match triple {
337 Ok((balance, nonce, code)) => {
338 mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
339 Ok((balance, nonce, code))
340 }
341 Err(err) => Err(err.into())
342 }
343 }
344 }
345 }
346
347 ACCOUNT_FETCH_SUPPORTS_ACC_INFO => {
348 let mut res = provider
349 .get_account_info(address)
350 .block_id(block_id)
351 .into_future()
352 .await
353 .map(|info| (info.balance, info.nonce, info.code));
354
355 if res.is_err() {
358 mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
359
360 let balance_fut =
361 provider.get_balance(address).block_id(block_id).into_future();
362 let nonce_fut = provider
363 .get_transaction_count(address)
364 .block_id(block_id)
365 .into_future();
366 let code_fut =
367 provider.get_code_at(address).block_id(block_id).into_future();
368 res = futures::future::try_join3(balance_fut, nonce_fut, code_fut).await;
369 }
370
371 Ok(res?)
372 }
373
374 ACCOUNT_FETCH_SEPARATE_REQUESTS => {
375 let balance_fut =
376 provider.get_balance(address).block_id(block_id).into_future();
377 let nonce_fut =
378 provider.get_transaction_count(address).block_id(block_id).into_future();
379 let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
380
381 Ok(futures::future::try_join3(balance_fut, nonce_fut, code_fut).await?)
382 }
383
384 _ => unreachable!("Invalid account fetch mode"),
385 }
386 };
387
388 ProviderRequest::Account(Box::pin(async move {
389 let result = fut.await;
390 (result, address)
391 }))
392 }
393
394 fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
396 match self.account_requests.entry(address) {
397 Entry::Occupied(mut entry) => {
398 entry.get_mut().push(listener);
399 }
400 Entry::Vacant(entry) => {
401 entry.insert(vec![listener]);
402 self.pending_requests.push(self.get_account_req(address));
403 }
404 }
405 }
406
407 fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
409 let provider = self.provider.clone();
410 let fut = Box::pin(async move {
411 let block = provider
412 .get_block(number)
413 .full()
414 .await
415 .wrap_err(format!("could not fetch block {number:?}"));
416 (sender, block, number)
417 });
418
419 self.pending_requests.push(ProviderRequest::FullBlock(fut));
420 }
421
422 fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
424 let provider = self.provider.clone();
425 let fut = Box::pin(async move {
426 let block = provider
427 .get_transaction_by_hash(tx)
428 .await
429 .wrap_err_with(|| format!("could not get transaction {tx}"))
430 .and_then(|maybe| {
431 maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
432 });
433 (sender, block, tx)
434 });
435
436 self.pending_requests.push(ProviderRequest::Transaction(fut));
437 }
438
439 fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
441 match self.block_requests.entry(number) {
442 Entry::Occupied(mut entry) => {
443 entry.get_mut().push(listener);
444 }
445 Entry::Vacant(entry) => {
446 trace!(target: "backendhandler", number, "preparing block hash request");
447 entry.insert(vec![listener]);
448 let provider = self.provider.clone();
449 let fut = Box::pin(async move {
450 let block = provider
451 .get_block_by_number(number.into())
452 .hashes()
453 .await
454 .wrap_err("failed to get block");
455
456 let block_hash = match block {
457 Ok(Some(block)) => Ok(block.header.hash),
458 Ok(None) => {
459 warn!(target: "backendhandler", ?number, "block not found");
460 Ok(KECCAK_EMPTY)
463 }
464 Err(err) => {
465 error!(target: "backendhandler", %err, ?number, "failed to get block");
466 Err(err)
467 }
468 };
469 (block_hash, number)
470 });
471 self.pending_requests.push(ProviderRequest::BlockHash(fut));
472 }
473 }
474 }
475}
476
477impl<P> Future for BackendHandler<P>
478where
479 P: Provider<AnyNetwork> + Clone + Unpin + 'static,
480{
481 type Output = ();
482
483 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
484 let pin = self.get_mut();
485 loop {
486 while let Some(req) = pin.queued_requests.pop_front() {
488 pin.on_request(req)
489 }
490
491 loop {
493 match Pin::new(&mut pin.incoming).poll_next(cx) {
494 Poll::Ready(Some(req)) => {
495 pin.queued_requests.push_back(req);
496 }
497 Poll::Ready(None) => {
498 trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
499 return Poll::Ready(());
500 }
501 Poll::Pending => break,
502 }
503 }
504
505 for n in (0..pin.pending_requests.len()).rev() {
507 let mut request = pin.pending_requests.swap_remove(n);
508 match &mut request {
509 ProviderRequest::Account(fut) => {
510 if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
511 let (balance, nonce, code) = match resp {
513 Ok(res) => res,
514 Err(err) => {
515 let err = Arc::new(err);
516 if let Some(listeners) = pin.account_requests.remove(&addr) {
517 listeners.into_iter().for_each(|l| {
518 let _ = l.send(Err(DatabaseError::GetAccount(
519 addr,
520 Arc::clone(&err),
521 )));
522 })
523 }
524 continue;
525 }
526 };
527
528 let (code, code_hash) = if !code.is_empty() {
530 (code.clone(), keccak256(&code))
531 } else {
532 (Bytes::default(), KECCAK_EMPTY)
533 };
534
535 let acc = AccountInfo {
537 nonce,
538 balance,
539 code: Some(Bytecode::new_raw(code)),
540 code_hash,
541 };
542 pin.db.accounts().write().insert(addr, acc.clone());
543
544 if let Some(listeners) = pin.account_requests.remove(&addr) {
546 listeners.into_iter().for_each(|l| {
547 let _ = l.send(Ok(acc.clone()));
548 })
549 }
550 continue;
551 }
552 }
553 ProviderRequest::Storage(fut) => {
554 if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
555 let value = match resp {
556 Ok(value) => value,
557 Err(err) => {
558 let err = Arc::new(err);
560 if let Some(listeners) =
561 pin.storage_requests.remove(&(addr, idx))
562 {
563 listeners.into_iter().for_each(|l| {
564 let _ = l.send(Err(DatabaseError::GetStorage(
565 addr,
566 idx,
567 Arc::clone(&err),
568 )));
569 })
570 }
571 continue;
572 }
573 };
574
575 pin.db.storage().write().entry(addr).or_default().insert(idx, value);
577
578 if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
580 listeners.into_iter().for_each(|l| {
581 let _ = l.send(Ok(value));
582 })
583 }
584 continue;
585 }
586 }
587 ProviderRequest::BlockHash(fut) => {
588 if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
589 let value = match block_hash {
590 Ok(value) => value,
591 Err(err) => {
592 let err = Arc::new(err);
593 if let Some(listeners) = pin.block_requests.remove(&number) {
595 listeners.into_iter().for_each(|l| {
596 let _ = l.send(Err(DatabaseError::GetBlockHash(
597 number,
598 Arc::clone(&err),
599 )));
600 })
601 }
602 continue;
603 }
604 };
605
606 pin.db.block_hashes().write().insert(U256::from(number), value);
608
609 if let Some(listeners) = pin.block_requests.remove(&number) {
611 listeners.into_iter().for_each(|l| {
612 let _ = l.send(Ok(value));
613 })
614 }
615 continue;
616 }
617 }
618 ProviderRequest::FullBlock(fut) => {
619 if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
620 let msg = match resp {
621 Ok(Some(block)) => Ok(block),
622 Ok(None) => Err(DatabaseError::BlockNotFound(number)),
623 Err(err) => {
624 let err = Arc::new(err);
625 Err(DatabaseError::GetFullBlock(number, err))
626 }
627 };
628 let _ = sender.send(msg);
629 continue;
630 }
631 }
632 ProviderRequest::Transaction(fut) => {
633 if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
634 let msg = match tx {
635 Ok(tx) => Ok(tx),
636 Err(err) => {
637 let err = Arc::new(err);
638 Err(DatabaseError::GetTransaction(tx_hash, err))
639 }
640 };
641 let _ = sender.send(msg);
642 continue;
643 }
644 }
645 ProviderRequest::AnyRequest(fut) => {
646 if fut.poll_inner(cx).is_ready() {
647 continue;
648 }
649 }
650 }
651 pin.pending_requests.push(request);
653 }
654
655 if pin.queued_requests.is_empty() {
658 return Poll::Pending;
659 }
660 }
661 }
662}
663
664#[derive(Default, Clone, Debug, PartialEq)]
667pub enum BlockingMode {
668 #[default]
672 BlockInPlace,
673 Block,
677}
678
679impl BlockingMode {
680 pub fn run<F, R>(&self, f: F) -> R
682 where
683 F: FnOnce() -> R,
684 {
685 match self {
686 Self::BlockInPlace => tokio::task::block_in_place(f),
687 Self::Block => f(),
688 }
689 }
690}
691
692#[derive(Clone, Debug)]
721pub struct SharedBackend {
722 backend: UnboundedSender<BackendRequest>,
724 cache: Arc<FlushJsonBlockCacheDB>,
729
730 blocking_mode: BlockingMode,
732}
733
734impl SharedBackend {
735 pub async fn spawn_backend<P>(provider: P, db: BlockchainDb, pin_block: Option<BlockId>) -> Self
743 where
744 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
745 {
746 let (shared, handler) = Self::new(provider, db, pin_block);
747 trace!(target: "backendhandler", "spawning Backendhandler task");
749 tokio::spawn(handler);
750 shared
751 }
752
753 pub fn spawn_backend_thread<P>(
756 provider: P,
757 db: BlockchainDb,
758 pin_block: Option<BlockId>,
759 ) -> Self
760 where
761 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
762 {
763 let (shared, handler) = Self::new(provider, db, pin_block);
764
765 std::thread::Builder::new()
768 .name("fork-backend".into())
769 .spawn(move || {
770 let rt = tokio::runtime::Builder::new_current_thread()
771 .enable_all()
772 .build()
773 .expect("failed to build tokio runtime");
774
775 rt.block_on(handler);
776 })
777 .expect("failed to spawn thread");
778 trace!(target: "backendhandler", "spawned Backendhandler thread");
779
780 shared
781 }
782
783 pub fn new<P>(
785 provider: P,
786 db: BlockchainDb,
787 pin_block: Option<BlockId>,
788 ) -> (Self, BackendHandler<P>)
789 where
790 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
791 {
792 let (backend, backend_rx) = unbounded();
793 let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
794 let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
795 (Self { backend, cache, blocking_mode: Default::default() }, handler)
796 }
797
798 pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
800 Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
801 }
802
803 pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
805 let req = BackendRequest::SetPinnedBlock(block.into());
806 self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
807 }
808
809 pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
811 self.blocking_mode.run(|| {
812 let (sender, rx) = oneshot_channel();
813 let req = BackendRequest::FullBlock(block.into(), sender);
814 self.backend.unbounded_send(req)?;
815 rx.recv()?
816 })
817 }
818
819 pub fn get_transaction(&self, tx: B256) -> DatabaseResult<AnyRpcTransaction> {
821 self.blocking_mode.run(|| {
822 let (sender, rx) = oneshot_channel();
823 let req = BackendRequest::Transaction(tx, sender);
824 self.backend.unbounded_send(req)?;
825 rx.recv()?
826 })
827 }
828
829 fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
830 self.blocking_mode.run(|| {
831 let (sender, rx) = oneshot_channel();
832 let req = BackendRequest::Basic(address, sender);
833 self.backend.unbounded_send(req)?;
834 rx.recv()?.map(Some)
835 })
836 }
837
838 fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
839 self.blocking_mode.run(|| {
840 let (sender, rx) = oneshot_channel();
841 let req = BackendRequest::Storage(address, index, sender);
842 self.backend.unbounded_send(req)?;
843 rx.recv()?
844 })
845 }
846
847 fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
848 self.blocking_mode.run(|| {
849 let (sender, rx) = oneshot_channel();
850 let req = BackendRequest::BlockHash(number, sender);
851 self.backend.unbounded_send(req)?;
852 rx.recv()?
853 })
854 }
855
856 pub fn insert_or_update_address(&self, address_data: AddressData) {
858 let req = BackendRequest::UpdateAddress(address_data);
859 let err = self.backend.unbounded_send(req);
860 match err {
861 Ok(_) => (),
862 Err(e) => {
863 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
864 }
865 }
866 }
867
868 pub fn insert_or_update_storage(&self, storage_data: StorageData) {
870 let req = BackendRequest::UpdateStorage(storage_data);
871 let err = self.backend.unbounded_send(req);
872 match err {
873 Ok(_) => (),
874 Err(e) => {
875 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
876 }
877 }
878 }
879
880 pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
882 let req = BackendRequest::UpdateBlockHash(block_hash_data);
883 let err = self.backend.unbounded_send(req);
884 match err {
885 Ok(_) => (),
886 Err(e) => {
887 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
888 }
889 }
890 }
891
892 pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
894 where
895 F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
896 T: fmt::Debug + Send + 'static,
897 {
898 self.blocking_mode.run(|| {
899 let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
900 let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
901 sender,
902 future: Box::pin(fut),
903 }));
904 self.backend.unbounded_send(req)?;
905 rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
906 })
907 }
908
909 pub fn flush_cache(&self) {
911 self.cache.0.flush();
912 }
913
914 pub fn flush_cache_to(&self, cache_path: &Path) {
916 self.cache.0.flush_to(cache_path);
917 }
918
919 pub fn data(&self) -> Arc<MemDb> {
921 self.cache.0.db().clone()
922 }
923
924 pub fn accounts(&self) -> AddressData {
926 self.cache.0.db().accounts.read().clone()
927 }
928
929 pub fn accounts_len(&self) -> usize {
931 self.cache.0.db().accounts.read().len()
932 }
933
934 pub fn storage(&self) -> StorageData {
936 self.cache.0.db().storage.read().clone()
937 }
938
939 pub fn storage_len(&self) -> usize {
941 self.cache.0.db().storage.read().len()
942 }
943
944 pub fn block_hashes(&self) -> BlockHashData {
946 self.cache.0.db().block_hashes.read().clone()
947 }
948
949 pub fn block_hashes_len(&self) -> usize {
951 self.cache.0.db().block_hashes.read().len()
952 }
953}
954
955impl DatabaseRef for SharedBackend {
956 type Error = DatabaseError;
957
958 fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
959 trace!(target: "sharedbackend", %address, "request basic");
960 self.do_get_basic(address).map_err(|err| {
961 error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
962 if err.is_possibly_non_archive_node_error() {
963 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
964 }
965 err
966 })
967 }
968
969 fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
970 Err(DatabaseError::MissingCode(hash))
971 }
972
973 fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
974 trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
975 self.do_get_storage(address, index).map_err(|err| {
976 error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
977 if err.is_possibly_non_archive_node_error() {
978 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
979 }
980 err
981 })
982 }
983
984 fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
985 trace!(target: "sharedbackend", "request block hash for number {:?}", number);
986 self.do_get_block_hash(number).map_err(|err| {
987 error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
988 if err.is_possibly_non_archive_node_error() {
989 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
990 }
991 err
992 })
993 }
994}
995
996#[cfg(test)]
997mod tests {
998 use super::*;
999 use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
1000 use alloy_consensus::BlockHeader;
1001 use alloy_provider::ProviderBuilder;
1002 use alloy_rpc_client::ClientBuilder;
1003 use serde::Deserialize;
1004 use std::{fs, path::PathBuf};
1005 use tiny_http::{Response, Server};
1006
1007 pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
1008 ProviderBuilder::new()
1009 .network::<AnyNetwork>()
1010 .connect_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
1011 }
1012
1013 const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
1014
1015 #[tokio::test(flavor = "multi_thread")]
1016 async fn test_builder() {
1017 let Some(endpoint) = ENDPOINT else { return };
1018 let provider = get_http_provider(endpoint);
1019
1020 let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
1021 let meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
1022
1023 assert_eq!(meta.block_env.number, U256::from(any_rpc_block.header.number()));
1024 }
1025
1026 #[tokio::test(flavor = "multi_thread")]
1027 async fn shared_backend() {
1028 let Some(endpoint) = ENDPOINT else { return };
1029
1030 let provider = get_http_provider(endpoint);
1031 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1032
1033 let db = BlockchainDb::new(meta, None);
1034 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1035
1036 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1038
1039 let idx = U256::from(0u64);
1040 let value = backend.storage_ref(address, idx).unwrap();
1041 let account = backend.basic_ref(address).unwrap().unwrap();
1042
1043 let mem_acc = db.accounts().read().get(&address).unwrap().clone();
1044 assert_eq!(account.balance, mem_acc.balance);
1045 assert_eq!(account.nonce, mem_acc.nonce);
1046 let slots = db.storage().read().get(&address).unwrap().clone();
1047 assert_eq!(slots.len(), 1);
1048 assert_eq!(slots.get(&idx).copied().unwrap(), value);
1049
1050 let num = 10u64;
1051 let hash = backend.block_hash_ref(num).unwrap();
1052 let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
1053 assert_eq!(hash, mem_hash);
1054
1055 let max_slots = 5;
1056 let handle = std::thread::spawn(move || {
1057 for i in 1..max_slots {
1058 let idx = U256::from(i);
1059 let _ = backend.storage_ref(address, idx);
1060 }
1061 });
1062 handle.join().unwrap();
1063 let slots = db.storage().read().get(&address).unwrap().clone();
1064 assert_eq!(slots.len() as u64, max_slots);
1065 }
1066
1067 #[test]
1068 fn can_read_cache() {
1069 let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
1070 let json = JsonBlockCacheDB::load(cache_path).unwrap();
1071 assert!(!json.db().accounts.read().is_empty());
1072 }
1073
1074 #[tokio::test(flavor = "multi_thread")]
1075 async fn can_modify_address() {
1076 let Some(endpoint) = ENDPOINT else { return };
1077
1078 let provider = get_http_provider(endpoint);
1079 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1080
1081 let db = BlockchainDb::new(meta, None);
1082 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1083
1084 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1086
1087 let new_acc = AccountInfo {
1088 nonce: 1000u64,
1089 balance: U256::from(2000),
1090 code: None,
1091 code_hash: KECCAK_EMPTY,
1092 };
1093 let mut account_data = AddressData::default();
1094 account_data.insert(address, new_acc.clone());
1095
1096 backend.insert_or_update_address(account_data);
1097
1098 let max_slots = 5;
1099 let handle = std::thread::spawn(move || {
1100 for i in 1..max_slots {
1101 let idx = U256::from(i);
1102 let result_address = backend.basic_ref(address).unwrap();
1103 match result_address {
1104 Some(acc) => {
1105 assert_eq!(
1106 acc.nonce, new_acc.nonce,
1107 "The nonce was not changed in instance of index {idx}"
1108 );
1109 assert_eq!(
1110 acc.balance, new_acc.balance,
1111 "The balance was not changed in instance of index {idx}"
1112 );
1113
1114 let db_address = {
1116 let accounts = db.accounts().read();
1117 accounts.get(&address).unwrap().clone()
1118 };
1119
1120 assert_eq!(
1121 db_address.nonce, new_acc.nonce,
1122 "The nonce was not changed in instance of index {idx}"
1123 );
1124 assert_eq!(
1125 db_address.balance, new_acc.balance,
1126 "The balance was not changed in instance of index {idx}"
1127 );
1128 }
1129 None => panic!("Account not found"),
1130 }
1131 }
1132 });
1133 handle.join().unwrap();
1134 }
1135
1136 #[tokio::test(flavor = "multi_thread")]
1137 async fn can_modify_storage() {
1138 let Some(endpoint) = ENDPOINT else { return };
1139
1140 let provider = get_http_provider(endpoint);
1141 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1142
1143 let db = BlockchainDb::new(meta, None);
1144 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1145
1146 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1148
1149 let mut storage_data = StorageData::default();
1150 let mut storage_info = StorageInfo::default();
1151 storage_info.insert(U256::from(20), U256::from(10));
1152 storage_info.insert(U256::from(30), U256::from(15));
1153 storage_info.insert(U256::from(40), U256::from(20));
1154
1155 storage_data.insert(address, storage_info);
1156
1157 backend.insert_or_update_storage(storage_data.clone());
1158
1159 let max_slots = 5;
1160 let handle = std::thread::spawn(move || {
1161 for _ in 1..max_slots {
1162 for (address, info) in &storage_data {
1163 for (index, value) in info {
1164 let result_storage = backend.do_get_storage(*address, *index);
1165 match result_storage {
1166 Ok(stg_db) => {
1167 assert_eq!(
1168 stg_db, *value,
1169 "Storage in slot number {index} in address {address} do not have the same value"
1170 );
1171
1172 let db_result = {
1173 let storage = db.storage().read();
1174 let address_storage = storage.get(address).unwrap();
1175 *address_storage.get(index).unwrap()
1176 };
1177
1178 assert_eq!(
1179 stg_db, db_result,
1180 "Storage in slot number {index} in address {address} do not have the same value"
1181 )
1182 }
1183
1184 Err(err) => {
1185 panic!("There was a database error: {err}")
1186 }
1187 }
1188 }
1189 }
1190 }
1191 });
1192 handle.join().unwrap();
1193 }
1194
1195 #[tokio::test(flavor = "multi_thread")]
1196 async fn can_modify_block_hashes() {
1197 let Some(endpoint) = ENDPOINT else { return };
1198
1199 let provider = get_http_provider(endpoint);
1200 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1201
1202 let db = BlockchainDb::new(meta, None);
1203 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1204
1205 let mut block_hash_data = BlockHashData::default();
1209 block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1210 block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1211 block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1212 block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1213 block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1214
1215 backend.insert_or_update_block_hashes(block_hash_data.clone());
1216
1217 let max_slots: u64 = 5;
1218 let handle = std::thread::spawn(move || {
1219 for i in 1..max_slots {
1220 let key = U256::from(i);
1221 let result_hash = backend.do_get_block_hash(i);
1222 match result_hash {
1223 Ok(hash) => {
1224 assert_eq!(
1225 hash,
1226 *block_hash_data.get(&key).unwrap(),
1227 "The hash in block {key} did not match"
1228 );
1229
1230 let db_result = {
1231 let hashes = db.block_hashes().read();
1232 *hashes.get(&key).unwrap()
1233 };
1234
1235 assert_eq!(hash, db_result, "The hash in block {key} did not match");
1236 }
1237 Err(err) => panic!("Hash not found, error: {err}"),
1238 }
1239 }
1240 });
1241 handle.join().unwrap();
1242 }
1243
1244 #[tokio::test(flavor = "multi_thread")]
1245 async fn can_modify_storage_with_cache() {
1246 let Some(endpoint) = ENDPOINT else { return };
1247
1248 let provider = get_http_provider(endpoint);
1249 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1250
1251 fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1253
1254 let cache_path =
1255 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1256
1257 let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1258 let backend =
1259 SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1260
1261 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1263
1264 let mut storage_data = StorageData::default();
1265 let mut storage_info = StorageInfo::default();
1266 storage_info.insert(U256::from(1), U256::from(10));
1267 storage_info.insert(U256::from(2), U256::from(15));
1268 storage_info.insert(U256::from(3), U256::from(20));
1269 storage_info.insert(U256::from(4), U256::from(20));
1270 storage_info.insert(U256::from(5), U256::from(15));
1271 storage_info.insert(U256::from(6), U256::from(10));
1272
1273 let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1274 address_data.code = None;
1275
1276 storage_data.insert(address, storage_info);
1277
1278 backend.insert_or_update_storage(storage_data.clone());
1279
1280 let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1281 new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1283
1284 let mut account_data = AddressData::default();
1285 account_data.insert(address, new_acc.clone());
1286
1287 backend.insert_or_update_address(account_data);
1288
1289 let backend_clone = backend.clone();
1290
1291 let max_slots = 5;
1292 let handle = std::thread::spawn(move || {
1293 for _ in 1..max_slots {
1294 for (address, info) in &storage_data {
1295 for (index, value) in info {
1296 let result_storage = backend.do_get_storage(*address, *index);
1297 match result_storage {
1298 Ok(stg_db) => {
1299 assert_eq!(
1300 stg_db, *value,
1301 "Storage in slot number {index} in address {address} doesn't have the same value"
1302 );
1303
1304 let db_result = {
1305 let storage = db.storage().read();
1306 let address_storage = storage.get(address).unwrap();
1307 *address_storage.get(index).unwrap()
1308 };
1309
1310 assert_eq!(
1311 stg_db, db_result,
1312 "Storage in slot number {index} in address {address} doesn't have the same value"
1313 );
1314 }
1315
1316 Err(err) => {
1317 panic!("There was a database error: {err}")
1318 }
1319 }
1320 }
1321 }
1322 }
1323
1324 backend_clone.flush_cache();
1325 });
1326 handle.join().unwrap();
1327
1328 let cache_path =
1331 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1332
1333 let json_db = BlockchainDb::new(meta, Some(cache_path));
1334
1335 let mut storage_data = StorageData::default();
1336 let mut storage_info = StorageInfo::default();
1337 storage_info.insert(U256::from(1), U256::from(10));
1338 storage_info.insert(U256::from(2), U256::from(15));
1339 storage_info.insert(U256::from(3), U256::from(20));
1340 storage_info.insert(U256::from(4), U256::from(20));
1341 storage_info.insert(U256::from(5), U256::from(15));
1342 storage_info.insert(U256::from(6), U256::from(10));
1343
1344 storage_data.insert(address, storage_info);
1345
1346 let max_slots = 5;
1348 let handle = std::thread::spawn(move || {
1349 for _ in 1..max_slots {
1350 for (address, info) in &storage_data {
1351 for (index, value) in info {
1352 let result_storage = {
1353 let storage = json_db.storage().read();
1354 let address_storage = storage.get(address).unwrap().clone();
1355 *address_storage.get(index).unwrap()
1356 };
1357
1358 assert_eq!(
1359 result_storage, *value,
1360 "Storage in slot number {index} in address {address} doesn't have the same value"
1361 );
1362 }
1363 }
1364 }
1365 });
1366
1367 handle.join().unwrap();
1368
1369 fs::remove_file("test-data/storage-tmp.json").unwrap();
1371 }
1372
1373 #[tokio::test(flavor = "multi_thread")]
1374 async fn shared_backend_any_request() {
1375 let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1376 let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1377 let endpoint = format!("http://{}", server.server_addr());
1378
1379 let expected_bytes_innner = expected_response_bytes.clone();
1381 let server_handle = std::thread::spawn(move || {
1382 #[derive(Debug, Deserialize)]
1383 struct Request {
1384 method: String,
1385 }
1386 let mut request = server.recv().unwrap();
1387 let rpc_request: Request =
1388 serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1389
1390 match rpc_request.method.as_str() {
1391 "foo_callCustomMethod" => request
1392 .respond(Response::from_string(format!(
1393 r#"{{"result": "{}"}}"#,
1394 alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1395 )))
1396 .unwrap(),
1397 _ => request
1398 .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1399 .unwrap(),
1400 };
1401 });
1402
1403 let provider = get_http_provider(&endpoint);
1404 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1405
1406 let db = BlockchainDb::new(meta, None);
1407 let provider_inner = provider.clone();
1408 let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1409
1410 let actual_response_bytes = backend
1411 .do_any_request(async move {
1412 let bytes: alloy_primitives::Bytes =
1413 provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1414 Ok(bytes)
1415 })
1416 .expect("failed performing any request");
1417
1418 assert_eq!(actual_response_bytes, expected_response_bytes);
1419
1420 server_handle.join().unwrap();
1421 }
1422}