1use crate::{
4 cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5 error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9 network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction, AnyTxEnvelope},
10 Provider,
11};
12use alloy_rpc_types::{BlockId, Transaction};
13use alloy_serde::WithOtherFields;
14use eyre::WrapErr;
15use futures::{
16 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
17 stream::Stream,
18 task::{Context, Poll},
19 Future, FutureExt,
20};
21use revm::{
22 db::DatabaseRef,
23 primitives::{
24 map::{hash_map::Entry, AddressHashMap, HashMap},
25 AccountInfo, Bytecode, KECCAK_EMPTY,
26 },
27};
28use std::{
29 collections::VecDeque,
30 fmt,
31 future::IntoFuture,
32 path::Path,
33 pin::Pin,
34 sync::{
35 mpsc::{channel as oneshot_channel, Sender as OneshotSender},
36 Arc,
37 },
38};
39
40pub const NON_ARCHIVE_NODE_WARNING: &str = "\
42It looks like you're trying to fork from an older block with a non-archive node which is not \
43supported. Please try to change your RPC url to an archive node if the issue persists.";
44
45type AccountFuture<Err> =
48 Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
49type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
50type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
51type FullBlockFuture<Err> = Pin<
52 Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
53>;
54type TransactionFuture<Err> =
55 Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
56
57type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
58type StorageSender = OneshotSender<DatabaseResult<U256>>;
59type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
60type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
61type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
62
63type AddressData = AddressHashMap<AccountInfo>;
64type StorageData = AddressHashMap<StorageInfo>;
65type BlockHashData = HashMap<U256, B256>;
66
67struct AnyRequestFuture<T, Err> {
68 sender: OneshotSender<Result<T, Err>>,
69 future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
70}
71
72impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
75 }
76}
77
78trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
79 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
80}
81
82impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
91where
92 T: fmt::Debug + Send + 'static,
93 Err: fmt::Debug + Send + 'static,
94{
95 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
96 match self.future.poll_unpin(cx) {
97 Poll::Ready(result) => {
98 let _ = self.sender.send(result);
99 Poll::Ready(())
100 }
101 Poll::Pending => Poll::Pending,
102 }
103 }
104}
105
106enum ProviderRequest<Err> {
108 Account(AccountFuture<Err>),
109 Storage(StorageFuture<Err>),
110 BlockHash(BlockHashFuture<Err>),
111 FullBlock(FullBlockFuture<Err>),
112 Transaction(TransactionFuture<Err>),
113 AnyRequest(Box<dyn WrappedAnyRequest>),
114}
115
116#[derive(Debug)]
118enum BackendRequest {
119 Basic(Address, AccountInfoSender),
121 Storage(Address, U256, StorageSender),
123 BlockHash(u64, BlockHashSender),
125 FullBlock(BlockId, FullBlockSender),
127 Transaction(B256, TransactionSender),
129 SetPinnedBlock(BlockId),
131
132 UpdateAddress(AddressData),
134 UpdateStorage(StorageData),
136 UpdateBlockHash(BlockHashData),
138 AnyRequest(Box<dyn WrappedAnyRequest>),
140}
141
142#[must_use = "futures do nothing unless polled"]
147pub struct BackendHandler<P> {
148 provider: P,
149 db: BlockchainDb,
151 pending_requests: Vec<ProviderRequest<eyre::Report>>,
153 account_requests: HashMap<Address, Vec<AccountInfoSender>>,
155 storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
157 block_requests: HashMap<u64, Vec<BlockHashSender>>,
159 incoming: UnboundedReceiver<BackendRequest>,
161 queued_requests: VecDeque<BackendRequest>,
163 block_id: Option<BlockId>,
166}
167
168impl<P> BackendHandler<P>
169where
170 P: Provider<AnyNetwork> + Clone + Unpin + 'static,
171{
172 fn new(
173 provider: P,
174 db: BlockchainDb,
175 rx: UnboundedReceiver<BackendRequest>,
176 block_id: Option<BlockId>,
177 ) -> Self {
178 Self {
179 provider,
180 db,
181 pending_requests: Default::default(),
182 account_requests: Default::default(),
183 storage_requests: Default::default(),
184 block_requests: Default::default(),
185 queued_requests: Default::default(),
186 incoming: rx,
187 block_id,
188 }
189 }
190
191 fn on_request(&mut self, req: BackendRequest) {
198 match req {
199 BackendRequest::Basic(addr, sender) => {
200 trace!(target: "backendhandler", "received request basic address={:?}", addr);
201 let acc = self.db.accounts().read().get(&addr).cloned();
202 if let Some(basic) = acc {
203 let _ = sender.send(Ok(basic));
204 } else {
205 self.request_account(addr, sender);
206 }
207 }
208 BackendRequest::BlockHash(number, sender) => {
209 let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
210 if let Some(hash) = hash {
211 let _ = sender.send(Ok(hash));
212 } else {
213 self.request_hash(number, sender);
214 }
215 }
216 BackendRequest::FullBlock(number, sender) => {
217 self.request_full_block(number, sender);
218 }
219 BackendRequest::Transaction(tx, sender) => {
220 self.request_transaction(tx, sender);
221 }
222 BackendRequest::Storage(addr, idx, sender) => {
223 let value =
225 self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
226 if let Some(value) = value {
227 let _ = sender.send(Ok(value));
228 } else {
229 self.request_account_storage(addr, idx, sender);
231 }
232 }
233 BackendRequest::SetPinnedBlock(block_id) => {
234 self.block_id = Some(block_id);
235 }
236 BackendRequest::UpdateAddress(address_data) => {
237 for (address, data) in address_data {
238 self.db.accounts().write().insert(address, data);
239 }
240 }
241 BackendRequest::UpdateStorage(storage_data) => {
242 for (address, data) in storage_data {
243 self.db.storage().write().insert(address, data);
244 }
245 }
246 BackendRequest::UpdateBlockHash(block_hash_data) => {
247 for (block, hash) in block_hash_data {
248 self.db.block_hashes().write().insert(block, hash);
249 }
250 }
251 BackendRequest::AnyRequest(fut) => {
252 self.pending_requests.push(ProviderRequest::AnyRequest(fut));
253 }
254 }
255 }
256
257 fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
259 match self.storage_requests.entry((address, idx)) {
260 Entry::Occupied(mut entry) => {
261 entry.get_mut().push(listener);
262 }
263 Entry::Vacant(entry) => {
264 trace!(target: "backendhandler", %address, %idx, "preparing storage request");
265 entry.insert(vec![listener]);
266 let provider = self.provider.clone();
267 let block_id = self.block_id.unwrap_or_default();
268 let fut = Box::pin(async move {
269 let storage = provider
270 .get_storage_at(address, idx)
271 .block_id(block_id)
272 .await
273 .map_err(Into::into);
274 (storage, address, idx)
275 });
276 self.pending_requests.push(ProviderRequest::Storage(fut));
277 }
278 }
279 }
280
281 fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
283 trace!(target: "backendhandler", "preparing account request, address={:?}", address);
284 let provider = self.provider.clone();
285 let block_id = self.block_id.unwrap_or_default();
286 let fut = Box::pin(async move {
287 let balance = provider.get_balance(address).block_id(block_id).into_future();
288 let nonce = provider.get_transaction_count(address).block_id(block_id).into_future();
289 let code = provider.get_code_at(address).block_id(block_id).into_future();
290 let resp = tokio::try_join!(balance, nonce, code).map_err(Into::into);
291 (resp, address)
292 });
293 ProviderRequest::Account(fut)
294 }
295
296 fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
298 match self.account_requests.entry(address) {
299 Entry::Occupied(mut entry) => {
300 entry.get_mut().push(listener);
301 }
302 Entry::Vacant(entry) => {
303 entry.insert(vec![listener]);
304 self.pending_requests.push(self.get_account_req(address));
305 }
306 }
307 }
308
309 fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
311 let provider = self.provider.clone();
312 let fut = Box::pin(async move {
313 let block = provider
314 .get_block(number, true.into())
315 .await
316 .wrap_err(format!("could not fetch block {number:?}"));
317 (sender, block, number)
318 });
319
320 self.pending_requests.push(ProviderRequest::FullBlock(fut));
321 }
322
323 fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
325 let provider = self.provider.clone();
326 let fut = Box::pin(async move {
327 let block = provider
328 .get_transaction_by_hash(tx)
329 .await
330 .wrap_err_with(|| format!("could not get transaction {tx}"))
331 .and_then(|maybe| {
332 maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
333 });
334 (sender, block, tx)
335 });
336
337 self.pending_requests.push(ProviderRequest::Transaction(fut));
338 }
339
340 fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
342 match self.block_requests.entry(number) {
343 Entry::Occupied(mut entry) => {
344 entry.get_mut().push(listener);
345 }
346 Entry::Vacant(entry) => {
347 trace!(target: "backendhandler", number, "preparing block hash request");
348 entry.insert(vec![listener]);
349 let provider = self.provider.clone();
350 let fut = Box::pin(async move {
351 let block = provider
352 .get_block_by_number(
353 number.into(),
354 alloy_rpc_types::BlockTransactionsKind::Hashes,
355 )
356 .await
357 .wrap_err("failed to get block");
358
359 let block_hash = match block {
360 Ok(Some(block)) => Ok(block.header.hash),
361 Ok(None) => {
362 warn!(target: "backendhandler", ?number, "block not found");
363 Ok(KECCAK_EMPTY)
366 }
367 Err(err) => {
368 error!(target: "backendhandler", %err, ?number, "failed to get block");
369 Err(err)
370 }
371 };
372 (block_hash, number)
373 });
374 self.pending_requests.push(ProviderRequest::BlockHash(fut));
375 }
376 }
377 }
378}
379
380impl<P> Future for BackendHandler<P>
381where
382 P: Provider<AnyNetwork> + Clone + Unpin + 'static,
383{
384 type Output = ();
385
386 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
387 let pin = self.get_mut();
388 loop {
389 while let Some(req) = pin.queued_requests.pop_front() {
391 pin.on_request(req)
392 }
393
394 loop {
396 match Pin::new(&mut pin.incoming).poll_next(cx) {
397 Poll::Ready(Some(req)) => {
398 pin.queued_requests.push_back(req);
399 }
400 Poll::Ready(None) => {
401 trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
402 return Poll::Ready(());
403 }
404 Poll::Pending => break,
405 }
406 }
407
408 for n in (0..pin.pending_requests.len()).rev() {
410 let mut request = pin.pending_requests.swap_remove(n);
411 match &mut request {
412 ProviderRequest::Account(fut) => {
413 if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
414 let (balance, nonce, code) = match resp {
416 Ok(res) => res,
417 Err(err) => {
418 let err = Arc::new(err);
419 if let Some(listeners) = pin.account_requests.remove(&addr) {
420 listeners.into_iter().for_each(|l| {
421 let _ = l.send(Err(DatabaseError::GetAccount(
422 addr,
423 Arc::clone(&err),
424 )));
425 })
426 }
427 continue;
428 }
429 };
430
431 let (code, code_hash) = if !code.is_empty() {
433 (code.clone(), keccak256(&code))
434 } else {
435 (Bytes::default(), KECCAK_EMPTY)
436 };
437
438 let acc = AccountInfo {
440 nonce,
441 balance,
442 code: Some(Bytecode::new_raw(code)),
443 code_hash,
444 };
445 pin.db.accounts().write().insert(addr, acc.clone());
446
447 if let Some(listeners) = pin.account_requests.remove(&addr) {
449 listeners.into_iter().for_each(|l| {
450 let _ = l.send(Ok(acc.clone()));
451 })
452 }
453 continue;
454 }
455 }
456 ProviderRequest::Storage(fut) => {
457 if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
458 let value = match resp {
459 Ok(value) => value,
460 Err(err) => {
461 let err = Arc::new(err);
463 if let Some(listeners) =
464 pin.storage_requests.remove(&(addr, idx))
465 {
466 listeners.into_iter().for_each(|l| {
467 let _ = l.send(Err(DatabaseError::GetStorage(
468 addr,
469 idx,
470 Arc::clone(&err),
471 )));
472 })
473 }
474 continue;
475 }
476 };
477
478 pin.db.storage().write().entry(addr).or_default().insert(idx, value);
480
481 if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
483 listeners.into_iter().for_each(|l| {
484 let _ = l.send(Ok(value));
485 })
486 }
487 continue;
488 }
489 }
490 ProviderRequest::BlockHash(fut) => {
491 if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
492 let value = match block_hash {
493 Ok(value) => value,
494 Err(err) => {
495 let err = Arc::new(err);
496 if let Some(listeners) = pin.block_requests.remove(&number) {
498 listeners.into_iter().for_each(|l| {
499 let _ = l.send(Err(DatabaseError::GetBlockHash(
500 number,
501 Arc::clone(&err),
502 )));
503 })
504 }
505 continue;
506 }
507 };
508
509 pin.db.block_hashes().write().insert(U256::from(number), value);
511
512 if let Some(listeners) = pin.block_requests.remove(&number) {
514 listeners.into_iter().for_each(|l| {
515 let _ = l.send(Ok(value));
516 })
517 }
518 continue;
519 }
520 }
521 ProviderRequest::FullBlock(fut) => {
522 if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
523 let msg = match resp {
524 Ok(Some(block)) => Ok(block),
525 Ok(None) => Err(DatabaseError::BlockNotFound(number)),
526 Err(err) => {
527 let err = Arc::new(err);
528 Err(DatabaseError::GetFullBlock(number, err))
529 }
530 };
531 let _ = sender.send(msg);
532 continue;
533 }
534 }
535 ProviderRequest::Transaction(fut) => {
536 if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
537 let msg = match tx {
538 Ok(tx) => Ok(tx),
539 Err(err) => {
540 let err = Arc::new(err);
541 Err(DatabaseError::GetTransaction(tx_hash, err))
542 }
543 };
544 let _ = sender.send(msg);
545 continue;
546 }
547 }
548 ProviderRequest::AnyRequest(fut) => {
549 if fut.poll_inner(cx).is_ready() {
550 continue;
551 }
552 }
553 }
554 pin.pending_requests.push(request);
556 }
557
558 if pin.queued_requests.is_empty() {
561 return Poll::Pending;
562 }
563 }
564 }
565}
566
567#[derive(Default, Clone, Debug, PartialEq)]
570pub enum BlockingMode {
571 #[default]
575 BlockInPlace,
576 Block,
580}
581
582impl BlockingMode {
583 pub fn run<F, R>(&self, f: F) -> R
585 where
586 F: FnOnce() -> R,
587 {
588 match self {
589 Self::BlockInPlace => tokio::task::block_in_place(f),
590 Self::Block => f(),
591 }
592 }
593}
594
595#[derive(Clone, Debug)]
624pub struct SharedBackend {
625 backend: UnboundedSender<BackendRequest>,
627 cache: Arc<FlushJsonBlockCacheDB>,
632
633 blocking_mode: BlockingMode,
635}
636
637impl SharedBackend {
638 pub async fn spawn_backend<P>(provider: P, db: BlockchainDb, pin_block: Option<BlockId>) -> Self
646 where
647 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
648 {
649 let (shared, handler) = Self::new(provider, db, pin_block);
650 trace!(target: "backendhandler", "spawning Backendhandler task");
652 tokio::spawn(handler);
653 shared
654 }
655
656 pub fn spawn_backend_thread<P>(
659 provider: P,
660 db: BlockchainDb,
661 pin_block: Option<BlockId>,
662 ) -> Self
663 where
664 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
665 {
666 let (shared, handler) = Self::new(provider, db, pin_block);
667
668 std::thread::Builder::new()
671 .name("fork-backend".into())
672 .spawn(move || {
673 let rt = tokio::runtime::Builder::new_current_thread()
674 .enable_all()
675 .build()
676 .expect("failed to build tokio runtime");
677
678 rt.block_on(handler);
679 })
680 .expect("failed to spawn thread");
681 trace!(target: "backendhandler", "spawned Backendhandler thread");
682
683 shared
684 }
685
686 pub fn new<P>(
688 provider: P,
689 db: BlockchainDb,
690 pin_block: Option<BlockId>,
691 ) -> (Self, BackendHandler<P>)
692 where
693 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
694 {
695 let (backend, backend_rx) = unbounded();
696 let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
697 let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
698 (Self { backend, cache, blocking_mode: Default::default() }, handler)
699 }
700
701 pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
703 Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
704 }
705
706 pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
708 let req = BackendRequest::SetPinnedBlock(block.into());
709 self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
710 }
711
712 pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
714 self.blocking_mode.run(|| {
715 let (sender, rx) = oneshot_channel();
716 let req = BackendRequest::FullBlock(block.into(), sender);
717 self.backend.unbounded_send(req)?;
718 rx.recv()?
719 })
720 }
721
722 pub fn get_transaction(
724 &self,
725 tx: B256,
726 ) -> DatabaseResult<WithOtherFields<Transaction<AnyTxEnvelope>>> {
727 self.blocking_mode.run(|| {
728 let (sender, rx) = oneshot_channel();
729 let req = BackendRequest::Transaction(tx, sender);
730 self.backend.unbounded_send(req)?;
731 rx.recv()?
732 })
733 }
734
735 fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
736 self.blocking_mode.run(|| {
737 let (sender, rx) = oneshot_channel();
738 let req = BackendRequest::Basic(address, sender);
739 self.backend.unbounded_send(req)?;
740 rx.recv()?.map(Some)
741 })
742 }
743
744 fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
745 self.blocking_mode.run(|| {
746 let (sender, rx) = oneshot_channel();
747 let req = BackendRequest::Storage(address, index, sender);
748 self.backend.unbounded_send(req)?;
749 rx.recv()?
750 })
751 }
752
753 fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
754 self.blocking_mode.run(|| {
755 let (sender, rx) = oneshot_channel();
756 let req = BackendRequest::BlockHash(number, sender);
757 self.backend.unbounded_send(req)?;
758 rx.recv()?
759 })
760 }
761
762 pub fn insert_or_update_address(&self, address_data: AddressData) {
764 let req = BackendRequest::UpdateAddress(address_data);
765 let err = self.backend.unbounded_send(req);
766 match err {
767 Ok(_) => (),
768 Err(e) => {
769 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
770 }
771 }
772 }
773
774 pub fn insert_or_update_storage(&self, storage_data: StorageData) {
776 let req = BackendRequest::UpdateStorage(storage_data);
777 let err = self.backend.unbounded_send(req);
778 match err {
779 Ok(_) => (),
780 Err(e) => {
781 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
782 }
783 }
784 }
785
786 pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
788 let req = BackendRequest::UpdateBlockHash(block_hash_data);
789 let err = self.backend.unbounded_send(req);
790 match err {
791 Ok(_) => (),
792 Err(e) => {
793 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
794 }
795 }
796 }
797
798 pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
800 where
801 F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
802 T: fmt::Debug + Send + 'static,
803 {
804 self.blocking_mode.run(|| {
805 let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
806 let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
807 sender,
808 future: Box::pin(fut),
809 }));
810 self.backend.unbounded_send(req)?;
811 rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
812 })
813 }
814
815 pub fn flush_cache(&self) {
817 self.cache.0.flush();
818 }
819
820 pub fn flush_cache_to(&self, cache_path: &Path) {
822 self.cache.0.flush_to(cache_path);
823 }
824
825 pub fn data(&self) -> Arc<MemDb> {
827 self.cache.0.db().clone()
828 }
829
830 pub fn accounts(&self) -> AddressData {
832 self.cache.0.db().accounts.read().clone()
833 }
834
835 pub fn accounts_len(&self) -> usize {
837 self.cache.0.db().accounts.read().len()
838 }
839
840 pub fn storage(&self) -> StorageData {
842 self.cache.0.db().storage.read().clone()
843 }
844
845 pub fn storage_len(&self) -> usize {
847 self.cache.0.db().storage.read().len()
848 }
849
850 pub fn block_hashes(&self) -> BlockHashData {
852 self.cache.0.db().block_hashes.read().clone()
853 }
854
855 pub fn block_hashes_len(&self) -> usize {
857 self.cache.0.db().block_hashes.read().len()
858 }
859}
860
861impl DatabaseRef for SharedBackend {
862 type Error = DatabaseError;
863
864 fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
865 trace!(target: "sharedbackend", %address, "request basic");
866 self.do_get_basic(address).map_err(|err| {
867 error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
868 if err.is_possibly_non_archive_node_error() {
869 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
870 }
871 err
872 })
873 }
874
875 fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
876 Err(DatabaseError::MissingCode(hash))
877 }
878
879 fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
880 trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
881 self.do_get_storage(address, index).map_err(|err| {
882 error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
883 if err.is_possibly_non_archive_node_error() {
884 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
885 }
886 err
887 })
888 }
889
890 fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
891 trace!(target: "sharedbackend", "request block hash for number {:?}", number);
892 self.do_get_block_hash(number).map_err(|err| {
893 error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
894 if err.is_possibly_non_archive_node_error() {
895 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
896 }
897 err
898 })
899 }
900}
901
902#[cfg(test)]
903mod tests {
904 use super::*;
905 use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
906 use alloy_provider::ProviderBuilder;
907 use alloy_rpc_client::ClientBuilder;
908 use serde::Deserialize;
909 use std::{collections::BTreeSet, fs, path::PathBuf};
910 use tiny_http::{Response, Server};
911
912 pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
913 ProviderBuilder::new()
914 .network::<AnyNetwork>()
915 .on_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
916 }
917
918 const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
919
920 #[tokio::test(flavor = "multi_thread")]
921 async fn test_builder() {
922 let Some(endpoint) = ENDPOINT else { return };
923 let provider = get_http_provider(endpoint);
924
925 let any_rpc_block = provider
926 .get_block(BlockId::latest(), alloy_rpc_types::BlockTransactionsKind::Hashes)
927 .await
928 .unwrap()
929 .unwrap();
930 let _meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
931 }
932
933 #[tokio::test(flavor = "multi_thread")]
934 async fn shared_backend() {
935 let Some(endpoint) = ENDPOINT else { return };
936
937 let provider = get_http_provider(endpoint);
938 let meta = BlockchainDbMeta {
939 cfg_env: Default::default(),
940 block_env: Default::default(),
941 hosts: BTreeSet::from([endpoint.to_string()]),
942 };
943
944 let db = BlockchainDb::new(meta, None);
945 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
946
947 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
949
950 let idx = U256::from(0u64);
951 let value = backend.storage_ref(address, idx).unwrap();
952 let account = backend.basic_ref(address).unwrap().unwrap();
953
954 let mem_acc = db.accounts().read().get(&address).unwrap().clone();
955 assert_eq!(account.balance, mem_acc.balance);
956 assert_eq!(account.nonce, mem_acc.nonce);
957 let slots = db.storage().read().get(&address).unwrap().clone();
958 assert_eq!(slots.len(), 1);
959 assert_eq!(slots.get(&idx).copied().unwrap(), value);
960
961 let num = 10u64;
962 let hash = backend.block_hash_ref(num).unwrap();
963 let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
964 assert_eq!(hash, mem_hash);
965
966 let max_slots = 5;
967 let handle = std::thread::spawn(move || {
968 for i in 1..max_slots {
969 let idx = U256::from(i);
970 let _ = backend.storage_ref(address, idx);
971 }
972 });
973 handle.join().unwrap();
974 let slots = db.storage().read().get(&address).unwrap().clone();
975 assert_eq!(slots.len() as u64, max_slots);
976 }
977
978 #[test]
979 fn can_read_cache() {
980 let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
981 let json = JsonBlockCacheDB::load(cache_path).unwrap();
982 assert!(!json.db().accounts.read().is_empty());
983 }
984
985 #[tokio::test(flavor = "multi_thread")]
986 async fn can_modify_address() {
987 let Some(endpoint) = ENDPOINT else { return };
988
989 let provider = get_http_provider(endpoint);
990 let meta = BlockchainDbMeta {
991 cfg_env: Default::default(),
992 block_env: Default::default(),
993 hosts: BTreeSet::from([endpoint.to_string()]),
994 };
995
996 let db = BlockchainDb::new(meta, None);
997 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
998
999 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1001
1002 let new_acc = AccountInfo {
1003 nonce: 1000u64,
1004 balance: U256::from(2000),
1005 code: None,
1006 code_hash: KECCAK_EMPTY,
1007 };
1008 let mut account_data = AddressData::default();
1009 account_data.insert(address, new_acc.clone());
1010
1011 backend.insert_or_update_address(account_data);
1012
1013 let max_slots = 5;
1014 let handle = std::thread::spawn(move || {
1015 for i in 1..max_slots {
1016 let idx = U256::from(i);
1017 let result_address = backend.basic_ref(address).unwrap();
1018 match result_address {
1019 Some(acc) => {
1020 assert_eq!(
1021 acc.nonce, new_acc.nonce,
1022 "The nonce was not changed in instance of index {}",
1023 idx
1024 );
1025 assert_eq!(
1026 acc.balance, new_acc.balance,
1027 "The balance was not changed in instance of index {}",
1028 idx
1029 );
1030
1031 let db_address = {
1033 let accounts = db.accounts().read();
1034 accounts.get(&address).unwrap().clone()
1035 };
1036
1037 assert_eq!(
1038 db_address.nonce, new_acc.nonce,
1039 "The nonce was not changed in instance of index {}",
1040 idx
1041 );
1042 assert_eq!(
1043 db_address.balance, new_acc.balance,
1044 "The balance was not changed in instance of index {}",
1045 idx
1046 );
1047 }
1048 None => panic!("Account not found"),
1049 }
1050 }
1051 });
1052 handle.join().unwrap();
1053 }
1054
1055 #[tokio::test(flavor = "multi_thread")]
1056 async fn can_modify_storage() {
1057 let Some(endpoint) = ENDPOINT else { return };
1058
1059 let provider = get_http_provider(endpoint);
1060 let meta = BlockchainDbMeta {
1061 cfg_env: Default::default(),
1062 block_env: Default::default(),
1063 hosts: BTreeSet::from([endpoint.to_string()]),
1064 };
1065
1066 let db = BlockchainDb::new(meta, None);
1067 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1068
1069 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1071
1072 let mut storage_data = StorageData::default();
1073 let mut storage_info = StorageInfo::default();
1074 storage_info.insert(U256::from(20), U256::from(10));
1075 storage_info.insert(U256::from(30), U256::from(15));
1076 storage_info.insert(U256::from(40), U256::from(20));
1077
1078 storage_data.insert(address, storage_info);
1079
1080 backend.insert_or_update_storage(storage_data.clone());
1081
1082 let max_slots = 5;
1083 let handle = std::thread::spawn(move || {
1084 for _ in 1..max_slots {
1085 for (address, info) in &storage_data {
1086 for (index, value) in info {
1087 let result_storage = backend.do_get_storage(*address, *index);
1088 match result_storage {
1089 Ok(stg_db) => {
1090 assert_eq!(
1091 stg_db, *value,
1092 "Storage in slot number {} in address {} do not have the same value", index, address
1093 );
1094
1095 let db_result = {
1096 let storage = db.storage().read();
1097 let address_storage = storage.get(address).unwrap();
1098 *address_storage.get(index).unwrap()
1099 };
1100
1101 assert_eq!(
1102 stg_db, db_result,
1103 "Storage in slot number {} in address {} do not have the same value", index, address
1104 )
1105 }
1106
1107 Err(err) => {
1108 panic!("There was a database error: {}", err)
1109 }
1110 }
1111 }
1112 }
1113 }
1114 });
1115 handle.join().unwrap();
1116 }
1117
1118 #[tokio::test(flavor = "multi_thread")]
1119 async fn can_modify_block_hashes() {
1120 let Some(endpoint) = ENDPOINT else { return };
1121
1122 let provider = get_http_provider(endpoint);
1123 let meta = BlockchainDbMeta {
1124 cfg_env: Default::default(),
1125 block_env: Default::default(),
1126 hosts: BTreeSet::from([endpoint.to_string()]),
1127 };
1128
1129 let db = BlockchainDb::new(meta, None);
1130 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1131
1132 let mut block_hash_data = BlockHashData::default();
1136 block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1137 block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1138 block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1139 block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1140 block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1141
1142 backend.insert_or_update_block_hashes(block_hash_data.clone());
1143
1144 let max_slots: u64 = 5;
1145 let handle = std::thread::spawn(move || {
1146 for i in 1..max_slots {
1147 let key = U256::from(i);
1148 let result_hash = backend.do_get_block_hash(i);
1149 match result_hash {
1150 Ok(hash) => {
1151 assert_eq!(
1152 hash,
1153 *block_hash_data.get(&key).unwrap(),
1154 "The hash in block {} did not match",
1155 key
1156 );
1157
1158 let db_result = {
1159 let hashes = db.block_hashes().read();
1160 *hashes.get(&key).unwrap()
1161 };
1162
1163 assert_eq!(hash, db_result, "The hash in block {} did not match", key);
1164 }
1165 Err(err) => panic!("Hash not found, error: {}", err),
1166 }
1167 }
1168 });
1169 handle.join().unwrap();
1170 }
1171
1172 #[tokio::test(flavor = "multi_thread")]
1173 async fn can_modify_storage_with_cache() {
1174 let Some(endpoint) = ENDPOINT else { return };
1175
1176 let provider = get_http_provider(endpoint);
1177 let meta = BlockchainDbMeta {
1178 cfg_env: Default::default(),
1179 block_env: Default::default(),
1180 hosts: BTreeSet::from([endpoint.to_string()]),
1181 };
1182
1183 fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1185
1186 let cache_path =
1187 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1188
1189 let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1190 let backend =
1191 SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1192
1193 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1195
1196 let mut storage_data = StorageData::default();
1197 let mut storage_info = StorageInfo::default();
1198 storage_info.insert(U256::from(1), U256::from(10));
1199 storage_info.insert(U256::from(2), U256::from(15));
1200 storage_info.insert(U256::from(3), U256::from(20));
1201 storage_info.insert(U256::from(4), U256::from(20));
1202 storage_info.insert(U256::from(5), U256::from(15));
1203 storage_info.insert(U256::from(6), U256::from(10));
1204
1205 let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1206 address_data.code = None;
1207
1208 storage_data.insert(address, storage_info);
1209
1210 backend.insert_or_update_storage(storage_data.clone());
1211
1212 let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1213 new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1215
1216 let mut account_data = AddressData::default();
1217 account_data.insert(address, new_acc.clone());
1218
1219 backend.insert_or_update_address(account_data);
1220
1221 let backend_clone = backend.clone();
1222
1223 let max_slots = 5;
1224 let handle = std::thread::spawn(move || {
1225 for _ in 1..max_slots {
1226 for (address, info) in &storage_data {
1227 for (index, value) in info {
1228 let result_storage = backend.do_get_storage(*address, *index);
1229 match result_storage {
1230 Ok(stg_db) => {
1231 assert_eq!(
1232 stg_db, *value,
1233 "Storage in slot number {} in address {} doesn't have the same value", index, address
1234 );
1235
1236 let db_result = {
1237 let storage = db.storage().read();
1238 let address_storage = storage.get(address).unwrap();
1239 *address_storage.get(index).unwrap()
1240 };
1241
1242 assert_eq!(
1243 stg_db, db_result,
1244 "Storage in slot number {} in address {} doesn't have the same value", index, address
1245 );
1246 }
1247
1248 Err(err) => {
1249 panic!("There was a database error: {}", err)
1250 }
1251 }
1252 }
1253 }
1254 }
1255
1256 backend_clone.flush_cache();
1257 });
1258 handle.join().unwrap();
1259
1260 let cache_path =
1263 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1264
1265 let json_db = BlockchainDb::new(meta, Some(cache_path));
1266
1267 let mut storage_data = StorageData::default();
1268 let mut storage_info = StorageInfo::default();
1269 storage_info.insert(U256::from(1), U256::from(10));
1270 storage_info.insert(U256::from(2), U256::from(15));
1271 storage_info.insert(U256::from(3), U256::from(20));
1272 storage_info.insert(U256::from(4), U256::from(20));
1273 storage_info.insert(U256::from(5), U256::from(15));
1274 storage_info.insert(U256::from(6), U256::from(10));
1275
1276 storage_data.insert(address, storage_info);
1277
1278 let max_slots = 5;
1280 let handle = std::thread::spawn(move || {
1281 for _ in 1..max_slots {
1282 for (address, info) in &storage_data {
1283 for (index, value) in info {
1284 let result_storage = {
1285 let storage = json_db.storage().read();
1286 let address_storage = storage.get(address).unwrap().clone();
1287 *address_storage.get(index).unwrap()
1288 };
1289
1290 assert_eq!(
1291 result_storage, *value,
1292 "Storage in slot number {} in address {} doesn't have the same value",
1293 index, address
1294 );
1295 }
1296 }
1297 }
1298 });
1299
1300 handle.join().unwrap();
1301
1302 fs::remove_file("test-data/storage-tmp.json").unwrap();
1304 }
1305
1306 #[tokio::test(flavor = "multi_thread")]
1307 async fn shared_backend_any_request() {
1308 let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1309 let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1310 let endpoint = format!("http://{}", server.server_addr());
1311
1312 let expected_bytes_innner = expected_response_bytes.clone();
1314 let server_handle = std::thread::spawn(move || {
1315 #[derive(Debug, Deserialize)]
1316 struct Request {
1317 method: String,
1318 }
1319 let mut request = server.recv().unwrap();
1320 let rpc_request: Request =
1321 serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1322
1323 match rpc_request.method.as_str() {
1324 "foo_callCustomMethod" => request
1325 .respond(Response::from_string(format!(
1326 r#"{{"result": "{}"}}"#,
1327 alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1328 )))
1329 .unwrap(),
1330 _ => request
1331 .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1332 .unwrap(),
1333 };
1334 });
1335
1336 let provider = get_http_provider(&endpoint);
1337 let meta = BlockchainDbMeta {
1338 cfg_env: Default::default(),
1339 block_env: Default::default(),
1340 hosts: BTreeSet::from([endpoint.to_string()]),
1341 };
1342
1343 let db = BlockchainDb::new(meta, None);
1344 let provider_inner = provider.clone();
1345 let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1346
1347 let actual_response_bytes = backend
1348 .do_any_request(async move {
1349 let bytes: alloy_primitives::Bytes =
1350 provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1351 Ok(bytes)
1352 })
1353 .expect("failed performing any request");
1354
1355 assert_eq!(actual_response_bytes, expected_response_bytes);
1356
1357 server_handle.join().unwrap();
1358 }
1359}