1use crate::{
4 cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5 error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9 network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction},
10 Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16 stream::Stream,
17 task::{Context, Poll},
18 Future, FutureExt,
19};
20use revm::{
21 db::DatabaseRef,
22 primitives::{
23 map::{hash_map::Entry, AddressHashMap, HashMap},
24 AccountInfo, Bytecode, KECCAK_EMPTY,
25 },
26};
27use std::{
28 collections::VecDeque,
29 fmt,
30 future::IntoFuture,
31 path::Path,
32 pin::Pin,
33 sync::{
34 mpsc::{channel as oneshot_channel, Sender as OneshotSender},
35 Arc,
36 },
37};
38
39pub const NON_ARCHIVE_NODE_WARNING: &str = "\
41It looks like you're trying to fork from an older block with a non-archive node which is not \
42supported. Please try to change your RPC url to an archive node if the issue persists.";
43
44type AccountFuture<Err> =
47 Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
48type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
49type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
50type FullBlockFuture<Err> = Pin<
51 Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
52>;
53type TransactionFuture<Err> =
54 Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
55
56type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
57type StorageSender = OneshotSender<DatabaseResult<U256>>;
58type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
59type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
60type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
61
62type AddressData = AddressHashMap<AccountInfo>;
63type StorageData = AddressHashMap<StorageInfo>;
64type BlockHashData = HashMap<U256, B256>;
65
66struct AnyRequestFuture<T, Err> {
67 sender: OneshotSender<Result<T, Err>>,
68 future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
69}
70
71impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
74 }
75}
76
77trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
78 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
79}
80
81impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
90where
91 T: fmt::Debug + Send + 'static,
92 Err: fmt::Debug + Send + 'static,
93{
94 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
95 match self.future.poll_unpin(cx) {
96 Poll::Ready(result) => {
97 let _ = self.sender.send(result);
98 Poll::Ready(())
99 }
100 Poll::Pending => Poll::Pending,
101 }
102 }
103}
104
105enum ProviderRequest<Err> {
107 Account(AccountFuture<Err>),
108 Storage(StorageFuture<Err>),
109 BlockHash(BlockHashFuture<Err>),
110 FullBlock(FullBlockFuture<Err>),
111 Transaction(TransactionFuture<Err>),
112 AnyRequest(Box<dyn WrappedAnyRequest>),
113}
114
115#[derive(Debug)]
117enum BackendRequest {
118 Basic(Address, AccountInfoSender),
120 Storage(Address, U256, StorageSender),
122 BlockHash(u64, BlockHashSender),
124 FullBlock(BlockId, FullBlockSender),
126 Transaction(B256, TransactionSender),
128 SetPinnedBlock(BlockId),
130
131 UpdateAddress(AddressData),
133 UpdateStorage(StorageData),
135 UpdateBlockHash(BlockHashData),
137 AnyRequest(Box<dyn WrappedAnyRequest>),
139}
140
141#[must_use = "futures do nothing unless polled"]
146pub struct BackendHandler<P> {
147 provider: P,
148 db: BlockchainDb,
150 pending_requests: Vec<ProviderRequest<eyre::Report>>,
152 account_requests: HashMap<Address, Vec<AccountInfoSender>>,
154 storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
156 block_requests: HashMap<u64, Vec<BlockHashSender>>,
158 incoming: UnboundedReceiver<BackendRequest>,
160 queued_requests: VecDeque<BackendRequest>,
162 block_id: Option<BlockId>,
165}
166
167impl<P> BackendHandler<P>
168where
169 P: Provider<AnyNetwork> + Clone + Unpin + 'static,
170{
171 fn new(
172 provider: P,
173 db: BlockchainDb,
174 rx: UnboundedReceiver<BackendRequest>,
175 block_id: Option<BlockId>,
176 ) -> Self {
177 Self {
178 provider,
179 db,
180 pending_requests: Default::default(),
181 account_requests: Default::default(),
182 storage_requests: Default::default(),
183 block_requests: Default::default(),
184 queued_requests: Default::default(),
185 incoming: rx,
186 block_id,
187 }
188 }
189
190 fn on_request(&mut self, req: BackendRequest) {
197 match req {
198 BackendRequest::Basic(addr, sender) => {
199 trace!(target: "backendhandler", "received request basic address={:?}", addr);
200 let acc = self.db.accounts().read().get(&addr).cloned();
201 if let Some(basic) = acc {
202 let _ = sender.send(Ok(basic));
203 } else {
204 self.request_account(addr, sender);
205 }
206 }
207 BackendRequest::BlockHash(number, sender) => {
208 let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
209 if let Some(hash) = hash {
210 let _ = sender.send(Ok(hash));
211 } else {
212 self.request_hash(number, sender);
213 }
214 }
215 BackendRequest::FullBlock(number, sender) => {
216 self.request_full_block(number, sender);
217 }
218 BackendRequest::Transaction(tx, sender) => {
219 self.request_transaction(tx, sender);
220 }
221 BackendRequest::Storage(addr, idx, sender) => {
222 let value =
224 self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
225 if let Some(value) = value {
226 let _ = sender.send(Ok(value));
227 } else {
228 self.request_account_storage(addr, idx, sender);
230 }
231 }
232 BackendRequest::SetPinnedBlock(block_id) => {
233 self.block_id = Some(block_id);
234 }
235 BackendRequest::UpdateAddress(address_data) => {
236 for (address, data) in address_data {
237 self.db.accounts().write().insert(address, data);
238 }
239 }
240 BackendRequest::UpdateStorage(storage_data) => {
241 for (address, data) in storage_data {
242 self.db.storage().write().insert(address, data);
243 }
244 }
245 BackendRequest::UpdateBlockHash(block_hash_data) => {
246 for (block, hash) in block_hash_data {
247 self.db.block_hashes().write().insert(block, hash);
248 }
249 }
250 BackendRequest::AnyRequest(fut) => {
251 self.pending_requests.push(ProviderRequest::AnyRequest(fut));
252 }
253 }
254 }
255
256 fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
258 match self.storage_requests.entry((address, idx)) {
259 Entry::Occupied(mut entry) => {
260 entry.get_mut().push(listener);
261 }
262 Entry::Vacant(entry) => {
263 trace!(target: "backendhandler", %address, %idx, "preparing storage request");
264 entry.insert(vec![listener]);
265 let provider = self.provider.clone();
266 let block_id = self.block_id.unwrap_or_default();
267 let fut = Box::pin(async move {
268 let storage = provider
269 .get_storage_at(address, idx)
270 .block_id(block_id)
271 .await
272 .map_err(Into::into);
273 (storage, address, idx)
274 });
275 self.pending_requests.push(ProviderRequest::Storage(fut));
276 }
277 }
278 }
279
280 fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
282 trace!(target: "backendhandler", "preparing account request, address={:?}", address);
283 let provider = self.provider.clone();
284 let block_id = self.block_id.unwrap_or_default();
285 let fut = Box::pin(async move {
286 let balance = provider.get_balance(address).block_id(block_id).into_future();
287 let nonce = provider.get_transaction_count(address).block_id(block_id).into_future();
288 let code = provider.get_code_at(address).block_id(block_id).into_future();
289 let resp = tokio::try_join!(balance, nonce, code).map_err(Into::into);
290 (resp, address)
291 });
292 ProviderRequest::Account(fut)
293 }
294
295 fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
297 match self.account_requests.entry(address) {
298 Entry::Occupied(mut entry) => {
299 entry.get_mut().push(listener);
300 }
301 Entry::Vacant(entry) => {
302 entry.insert(vec![listener]);
303 self.pending_requests.push(self.get_account_req(address));
304 }
305 }
306 }
307
308 fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
310 let provider = self.provider.clone();
311 let fut = Box::pin(async move {
312 let block = provider
313 .get_block(number)
314 .full()
315 .await
316 .wrap_err(format!("could not fetch block {number:?}"));
317 (sender, block, number)
318 });
319
320 self.pending_requests.push(ProviderRequest::FullBlock(fut));
321 }
322
323 fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
325 let provider = self.provider.clone();
326 let fut = Box::pin(async move {
327 let block = provider
328 .get_transaction_by_hash(tx)
329 .await
330 .wrap_err_with(|| format!("could not get transaction {tx}"))
331 .and_then(|maybe| {
332 maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
333 });
334 (sender, block, tx)
335 });
336
337 self.pending_requests.push(ProviderRequest::Transaction(fut));
338 }
339
340 fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
342 match self.block_requests.entry(number) {
343 Entry::Occupied(mut entry) => {
344 entry.get_mut().push(listener);
345 }
346 Entry::Vacant(entry) => {
347 trace!(target: "backendhandler", number, "preparing block hash request");
348 entry.insert(vec![listener]);
349 let provider = self.provider.clone();
350 let fut = Box::pin(async move {
351 let block = provider
352 .get_block_by_number(number.into())
353 .hashes()
354 .await
355 .wrap_err("failed to get block");
356
357 let block_hash = match block {
358 Ok(Some(block)) => Ok(block.header.hash),
359 Ok(None) => {
360 warn!(target: "backendhandler", ?number, "block not found");
361 Ok(KECCAK_EMPTY)
364 }
365 Err(err) => {
366 error!(target: "backendhandler", %err, ?number, "failed to get block");
367 Err(err)
368 }
369 };
370 (block_hash, number)
371 });
372 self.pending_requests.push(ProviderRequest::BlockHash(fut));
373 }
374 }
375 }
376}
377
378impl<P> Future for BackendHandler<P>
379where
380 P: Provider<AnyNetwork> + Clone + Unpin + 'static,
381{
382 type Output = ();
383
384 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
385 let pin = self.get_mut();
386 loop {
387 while let Some(req) = pin.queued_requests.pop_front() {
389 pin.on_request(req)
390 }
391
392 loop {
394 match Pin::new(&mut pin.incoming).poll_next(cx) {
395 Poll::Ready(Some(req)) => {
396 pin.queued_requests.push_back(req);
397 }
398 Poll::Ready(None) => {
399 trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
400 return Poll::Ready(());
401 }
402 Poll::Pending => break,
403 }
404 }
405
406 for n in (0..pin.pending_requests.len()).rev() {
408 let mut request = pin.pending_requests.swap_remove(n);
409 match &mut request {
410 ProviderRequest::Account(fut) => {
411 if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
412 let (balance, nonce, code) = match resp {
414 Ok(res) => res,
415 Err(err) => {
416 let err = Arc::new(err);
417 if let Some(listeners) = pin.account_requests.remove(&addr) {
418 listeners.into_iter().for_each(|l| {
419 let _ = l.send(Err(DatabaseError::GetAccount(
420 addr,
421 Arc::clone(&err),
422 )));
423 })
424 }
425 continue;
426 }
427 };
428
429 let (code, code_hash) = if !code.is_empty() {
431 (code.clone(), keccak256(&code))
432 } else {
433 (Bytes::default(), KECCAK_EMPTY)
434 };
435
436 let acc = AccountInfo {
438 nonce,
439 balance,
440 code: Some(Bytecode::new_raw(code)),
441 code_hash,
442 };
443 pin.db.accounts().write().insert(addr, acc.clone());
444
445 if let Some(listeners) = pin.account_requests.remove(&addr) {
447 listeners.into_iter().for_each(|l| {
448 let _ = l.send(Ok(acc.clone()));
449 })
450 }
451 continue;
452 }
453 }
454 ProviderRequest::Storage(fut) => {
455 if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
456 let value = match resp {
457 Ok(value) => value,
458 Err(err) => {
459 let err = Arc::new(err);
461 if let Some(listeners) =
462 pin.storage_requests.remove(&(addr, idx))
463 {
464 listeners.into_iter().for_each(|l| {
465 let _ = l.send(Err(DatabaseError::GetStorage(
466 addr,
467 idx,
468 Arc::clone(&err),
469 )));
470 })
471 }
472 continue;
473 }
474 };
475
476 pin.db.storage().write().entry(addr).or_default().insert(idx, value);
478
479 if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
481 listeners.into_iter().for_each(|l| {
482 let _ = l.send(Ok(value));
483 })
484 }
485 continue;
486 }
487 }
488 ProviderRequest::BlockHash(fut) => {
489 if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
490 let value = match block_hash {
491 Ok(value) => value,
492 Err(err) => {
493 let err = Arc::new(err);
494 if let Some(listeners) = pin.block_requests.remove(&number) {
496 listeners.into_iter().for_each(|l| {
497 let _ = l.send(Err(DatabaseError::GetBlockHash(
498 number,
499 Arc::clone(&err),
500 )));
501 })
502 }
503 continue;
504 }
505 };
506
507 pin.db.block_hashes().write().insert(U256::from(number), value);
509
510 if let Some(listeners) = pin.block_requests.remove(&number) {
512 listeners.into_iter().for_each(|l| {
513 let _ = l.send(Ok(value));
514 })
515 }
516 continue;
517 }
518 }
519 ProviderRequest::FullBlock(fut) => {
520 if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
521 let msg = match resp {
522 Ok(Some(block)) => Ok(block),
523 Ok(None) => Err(DatabaseError::BlockNotFound(number)),
524 Err(err) => {
525 let err = Arc::new(err);
526 Err(DatabaseError::GetFullBlock(number, err))
527 }
528 };
529 let _ = sender.send(msg);
530 continue;
531 }
532 }
533 ProviderRequest::Transaction(fut) => {
534 if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
535 let msg = match tx {
536 Ok(tx) => Ok(tx),
537 Err(err) => {
538 let err = Arc::new(err);
539 Err(DatabaseError::GetTransaction(tx_hash, err))
540 }
541 };
542 let _ = sender.send(msg);
543 continue;
544 }
545 }
546 ProviderRequest::AnyRequest(fut) => {
547 if fut.poll_inner(cx).is_ready() {
548 continue;
549 }
550 }
551 }
552 pin.pending_requests.push(request);
554 }
555
556 if pin.queued_requests.is_empty() {
559 return Poll::Pending;
560 }
561 }
562 }
563}
564
565#[derive(Default, Clone, Debug, PartialEq)]
568pub enum BlockingMode {
569 #[default]
573 BlockInPlace,
574 Block,
578}
579
580impl BlockingMode {
581 pub fn run<F, R>(&self, f: F) -> R
583 where
584 F: FnOnce() -> R,
585 {
586 match self {
587 Self::BlockInPlace => tokio::task::block_in_place(f),
588 Self::Block => f(),
589 }
590 }
591}
592
593#[derive(Clone, Debug)]
622pub struct SharedBackend {
623 backend: UnboundedSender<BackendRequest>,
625 cache: Arc<FlushJsonBlockCacheDB>,
630
631 blocking_mode: BlockingMode,
633}
634
635impl SharedBackend {
636 pub async fn spawn_backend<P>(provider: P, db: BlockchainDb, pin_block: Option<BlockId>) -> Self
644 where
645 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
646 {
647 let (shared, handler) = Self::new(provider, db, pin_block);
648 trace!(target: "backendhandler", "spawning Backendhandler task");
650 tokio::spawn(handler);
651 shared
652 }
653
654 pub fn spawn_backend_thread<P>(
657 provider: P,
658 db: BlockchainDb,
659 pin_block: Option<BlockId>,
660 ) -> Self
661 where
662 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
663 {
664 let (shared, handler) = Self::new(provider, db, pin_block);
665
666 std::thread::Builder::new()
669 .name("fork-backend".into())
670 .spawn(move || {
671 let rt = tokio::runtime::Builder::new_current_thread()
672 .enable_all()
673 .build()
674 .expect("failed to build tokio runtime");
675
676 rt.block_on(handler);
677 })
678 .expect("failed to spawn thread");
679 trace!(target: "backendhandler", "spawned Backendhandler thread");
680
681 shared
682 }
683
684 pub fn new<P>(
686 provider: P,
687 db: BlockchainDb,
688 pin_block: Option<BlockId>,
689 ) -> (Self, BackendHandler<P>)
690 where
691 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
692 {
693 let (backend, backend_rx) = unbounded();
694 let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
695 let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
696 (Self { backend, cache, blocking_mode: Default::default() }, handler)
697 }
698
699 pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
701 Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
702 }
703
704 pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
706 let req = BackendRequest::SetPinnedBlock(block.into());
707 self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
708 }
709
710 pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
712 self.blocking_mode.run(|| {
713 let (sender, rx) = oneshot_channel();
714 let req = BackendRequest::FullBlock(block.into(), sender);
715 self.backend.unbounded_send(req)?;
716 rx.recv()?
717 })
718 }
719
720 pub fn get_transaction(&self, tx: B256) -> DatabaseResult<AnyRpcTransaction> {
722 self.blocking_mode.run(|| {
723 let (sender, rx) = oneshot_channel();
724 let req = BackendRequest::Transaction(tx, sender);
725 self.backend.unbounded_send(req)?;
726 rx.recv()?
727 })
728 }
729
730 fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
731 self.blocking_mode.run(|| {
732 let (sender, rx) = oneshot_channel();
733 let req = BackendRequest::Basic(address, sender);
734 self.backend.unbounded_send(req)?;
735 rx.recv()?.map(Some)
736 })
737 }
738
739 fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
740 self.blocking_mode.run(|| {
741 let (sender, rx) = oneshot_channel();
742 let req = BackendRequest::Storage(address, index, sender);
743 self.backend.unbounded_send(req)?;
744 rx.recv()?
745 })
746 }
747
748 fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
749 self.blocking_mode.run(|| {
750 let (sender, rx) = oneshot_channel();
751 let req = BackendRequest::BlockHash(number, sender);
752 self.backend.unbounded_send(req)?;
753 rx.recv()?
754 })
755 }
756
757 pub fn insert_or_update_address(&self, address_data: AddressData) {
759 let req = BackendRequest::UpdateAddress(address_data);
760 let err = self.backend.unbounded_send(req);
761 match err {
762 Ok(_) => (),
763 Err(e) => {
764 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
765 }
766 }
767 }
768
769 pub fn insert_or_update_storage(&self, storage_data: StorageData) {
771 let req = BackendRequest::UpdateStorage(storage_data);
772 let err = self.backend.unbounded_send(req);
773 match err {
774 Ok(_) => (),
775 Err(e) => {
776 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
777 }
778 }
779 }
780
781 pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
783 let req = BackendRequest::UpdateBlockHash(block_hash_data);
784 let err = self.backend.unbounded_send(req);
785 match err {
786 Ok(_) => (),
787 Err(e) => {
788 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
789 }
790 }
791 }
792
793 pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
795 where
796 F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
797 T: fmt::Debug + Send + 'static,
798 {
799 self.blocking_mode.run(|| {
800 let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
801 let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
802 sender,
803 future: Box::pin(fut),
804 }));
805 self.backend.unbounded_send(req)?;
806 rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
807 })
808 }
809
810 pub fn flush_cache(&self) {
812 self.cache.0.flush();
813 }
814
815 pub fn flush_cache_to(&self, cache_path: &Path) {
817 self.cache.0.flush_to(cache_path);
818 }
819
820 pub fn data(&self) -> Arc<MemDb> {
822 self.cache.0.db().clone()
823 }
824
825 pub fn accounts(&self) -> AddressData {
827 self.cache.0.db().accounts.read().clone()
828 }
829
830 pub fn accounts_len(&self) -> usize {
832 self.cache.0.db().accounts.read().len()
833 }
834
835 pub fn storage(&self) -> StorageData {
837 self.cache.0.db().storage.read().clone()
838 }
839
840 pub fn storage_len(&self) -> usize {
842 self.cache.0.db().storage.read().len()
843 }
844
845 pub fn block_hashes(&self) -> BlockHashData {
847 self.cache.0.db().block_hashes.read().clone()
848 }
849
850 pub fn block_hashes_len(&self) -> usize {
852 self.cache.0.db().block_hashes.read().len()
853 }
854}
855
856impl DatabaseRef for SharedBackend {
857 type Error = DatabaseError;
858
859 fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
860 trace!(target: "sharedbackend", %address, "request basic");
861 self.do_get_basic(address).map_err(|err| {
862 error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
863 if err.is_possibly_non_archive_node_error() {
864 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
865 }
866 err
867 })
868 }
869
870 fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
871 Err(DatabaseError::MissingCode(hash))
872 }
873
874 fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
875 trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
876 self.do_get_storage(address, index).map_err(|err| {
877 error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
878 if err.is_possibly_non_archive_node_error() {
879 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
880 }
881 err
882 })
883 }
884
885 fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
886 trace!(target: "sharedbackend", "request block hash for number {:?}", number);
887 self.do_get_block_hash(number).map_err(|err| {
888 error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
889 if err.is_possibly_non_archive_node_error() {
890 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
891 }
892 err
893 })
894 }
895}
896
897#[cfg(test)]
898mod tests {
899 use super::*;
900 use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
901 use alloy_provider::ProviderBuilder;
902 use alloy_rpc_client::ClientBuilder;
903 use serde::Deserialize;
904 use std::{collections::BTreeSet, fs, path::PathBuf};
905 use tiny_http::{Response, Server};
906
907 pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
908 ProviderBuilder::new()
909 .network::<AnyNetwork>()
910 .on_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
911 }
912
913 const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
914
915 #[tokio::test(flavor = "multi_thread")]
916 async fn test_builder() {
917 let Some(endpoint) = ENDPOINT else { return };
918 let provider = get_http_provider(endpoint);
919
920 let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
921 let _meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
922 }
923
924 #[tokio::test(flavor = "multi_thread")]
925 async fn shared_backend() {
926 let Some(endpoint) = ENDPOINT else { return };
927
928 let provider = get_http_provider(endpoint);
929 let meta = BlockchainDbMeta {
930 cfg_env: Default::default(),
931 block_env: Default::default(),
932 hosts: BTreeSet::from([endpoint.to_string()]),
933 };
934
935 let db = BlockchainDb::new(meta, None);
936 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
937
938 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
940
941 let idx = U256::from(0u64);
942 let value = backend.storage_ref(address, idx).unwrap();
943 let account = backend.basic_ref(address).unwrap().unwrap();
944
945 let mem_acc = db.accounts().read().get(&address).unwrap().clone();
946 assert_eq!(account.balance, mem_acc.balance);
947 assert_eq!(account.nonce, mem_acc.nonce);
948 let slots = db.storage().read().get(&address).unwrap().clone();
949 assert_eq!(slots.len(), 1);
950 assert_eq!(slots.get(&idx).copied().unwrap(), value);
951
952 let num = 10u64;
953 let hash = backend.block_hash_ref(num).unwrap();
954 let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
955 assert_eq!(hash, mem_hash);
956
957 let max_slots = 5;
958 let handle = std::thread::spawn(move || {
959 for i in 1..max_slots {
960 let idx = U256::from(i);
961 let _ = backend.storage_ref(address, idx);
962 }
963 });
964 handle.join().unwrap();
965 let slots = db.storage().read().get(&address).unwrap().clone();
966 assert_eq!(slots.len() as u64, max_slots);
967 }
968
969 #[test]
970 fn can_read_cache() {
971 let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
972 let json = JsonBlockCacheDB::load(cache_path).unwrap();
973 assert!(!json.db().accounts.read().is_empty());
974 }
975
976 #[tokio::test(flavor = "multi_thread")]
977 async fn can_modify_address() {
978 let Some(endpoint) = ENDPOINT else { return };
979
980 let provider = get_http_provider(endpoint);
981 let meta = BlockchainDbMeta {
982 cfg_env: Default::default(),
983 block_env: Default::default(),
984 hosts: BTreeSet::from([endpoint.to_string()]),
985 };
986
987 let db = BlockchainDb::new(meta, None);
988 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
989
990 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
992
993 let new_acc = AccountInfo {
994 nonce: 1000u64,
995 balance: U256::from(2000),
996 code: None,
997 code_hash: KECCAK_EMPTY,
998 };
999 let mut account_data = AddressData::default();
1000 account_data.insert(address, new_acc.clone());
1001
1002 backend.insert_or_update_address(account_data);
1003
1004 let max_slots = 5;
1005 let handle = std::thread::spawn(move || {
1006 for i in 1..max_slots {
1007 let idx = U256::from(i);
1008 let result_address = backend.basic_ref(address).unwrap();
1009 match result_address {
1010 Some(acc) => {
1011 assert_eq!(
1012 acc.nonce, new_acc.nonce,
1013 "The nonce was not changed in instance of index {}",
1014 idx
1015 );
1016 assert_eq!(
1017 acc.balance, new_acc.balance,
1018 "The balance was not changed in instance of index {}",
1019 idx
1020 );
1021
1022 let db_address = {
1024 let accounts = db.accounts().read();
1025 accounts.get(&address).unwrap().clone()
1026 };
1027
1028 assert_eq!(
1029 db_address.nonce, new_acc.nonce,
1030 "The nonce was not changed in instance of index {}",
1031 idx
1032 );
1033 assert_eq!(
1034 db_address.balance, new_acc.balance,
1035 "The balance was not changed in instance of index {}",
1036 idx
1037 );
1038 }
1039 None => panic!("Account not found"),
1040 }
1041 }
1042 });
1043 handle.join().unwrap();
1044 }
1045
1046 #[tokio::test(flavor = "multi_thread")]
1047 async fn can_modify_storage() {
1048 let Some(endpoint) = ENDPOINT else { return };
1049
1050 let provider = get_http_provider(endpoint);
1051 let meta = BlockchainDbMeta {
1052 cfg_env: Default::default(),
1053 block_env: Default::default(),
1054 hosts: BTreeSet::from([endpoint.to_string()]),
1055 };
1056
1057 let db = BlockchainDb::new(meta, None);
1058 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1059
1060 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1062
1063 let mut storage_data = StorageData::default();
1064 let mut storage_info = StorageInfo::default();
1065 storage_info.insert(U256::from(20), U256::from(10));
1066 storage_info.insert(U256::from(30), U256::from(15));
1067 storage_info.insert(U256::from(40), U256::from(20));
1068
1069 storage_data.insert(address, storage_info);
1070
1071 backend.insert_or_update_storage(storage_data.clone());
1072
1073 let max_slots = 5;
1074 let handle = std::thread::spawn(move || {
1075 for _ in 1..max_slots {
1076 for (address, info) in &storage_data {
1077 for (index, value) in info {
1078 let result_storage = backend.do_get_storage(*address, *index);
1079 match result_storage {
1080 Ok(stg_db) => {
1081 assert_eq!(
1082 stg_db, *value,
1083 "Storage in slot number {} in address {} do not have the same value", index, address
1084 );
1085
1086 let db_result = {
1087 let storage = db.storage().read();
1088 let address_storage = storage.get(address).unwrap();
1089 *address_storage.get(index).unwrap()
1090 };
1091
1092 assert_eq!(
1093 stg_db, db_result,
1094 "Storage in slot number {} in address {} do not have the same value", index, address
1095 )
1096 }
1097
1098 Err(err) => {
1099 panic!("There was a database error: {}", err)
1100 }
1101 }
1102 }
1103 }
1104 }
1105 });
1106 handle.join().unwrap();
1107 }
1108
1109 #[tokio::test(flavor = "multi_thread")]
1110 async fn can_modify_block_hashes() {
1111 let Some(endpoint) = ENDPOINT else { return };
1112
1113 let provider = get_http_provider(endpoint);
1114 let meta = BlockchainDbMeta {
1115 cfg_env: Default::default(),
1116 block_env: Default::default(),
1117 hosts: BTreeSet::from([endpoint.to_string()]),
1118 };
1119
1120 let db = BlockchainDb::new(meta, None);
1121 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1122
1123 let mut block_hash_data = BlockHashData::default();
1127 block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1128 block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1129 block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1130 block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1131 block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1132
1133 backend.insert_or_update_block_hashes(block_hash_data.clone());
1134
1135 let max_slots: u64 = 5;
1136 let handle = std::thread::spawn(move || {
1137 for i in 1..max_slots {
1138 let key = U256::from(i);
1139 let result_hash = backend.do_get_block_hash(i);
1140 match result_hash {
1141 Ok(hash) => {
1142 assert_eq!(
1143 hash,
1144 *block_hash_data.get(&key).unwrap(),
1145 "The hash in block {} did not match",
1146 key
1147 );
1148
1149 let db_result = {
1150 let hashes = db.block_hashes().read();
1151 *hashes.get(&key).unwrap()
1152 };
1153
1154 assert_eq!(hash, db_result, "The hash in block {} did not match", key);
1155 }
1156 Err(err) => panic!("Hash not found, error: {}", err),
1157 }
1158 }
1159 });
1160 handle.join().unwrap();
1161 }
1162
1163 #[tokio::test(flavor = "multi_thread")]
1164 async fn can_modify_storage_with_cache() {
1165 let Some(endpoint) = ENDPOINT else { return };
1166
1167 let provider = get_http_provider(endpoint);
1168 let meta = BlockchainDbMeta {
1169 cfg_env: Default::default(),
1170 block_env: Default::default(),
1171 hosts: BTreeSet::from([endpoint.to_string()]),
1172 };
1173
1174 fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1176
1177 let cache_path =
1178 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1179
1180 let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1181 let backend =
1182 SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1183
1184 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1186
1187 let mut storage_data = StorageData::default();
1188 let mut storage_info = StorageInfo::default();
1189 storage_info.insert(U256::from(1), U256::from(10));
1190 storage_info.insert(U256::from(2), U256::from(15));
1191 storage_info.insert(U256::from(3), U256::from(20));
1192 storage_info.insert(U256::from(4), U256::from(20));
1193 storage_info.insert(U256::from(5), U256::from(15));
1194 storage_info.insert(U256::from(6), U256::from(10));
1195
1196 let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1197 address_data.code = None;
1198
1199 storage_data.insert(address, storage_info);
1200
1201 backend.insert_or_update_storage(storage_data.clone());
1202
1203 let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1204 new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1206
1207 let mut account_data = AddressData::default();
1208 account_data.insert(address, new_acc.clone());
1209
1210 backend.insert_or_update_address(account_data);
1211
1212 let backend_clone = backend.clone();
1213
1214 let max_slots = 5;
1215 let handle = std::thread::spawn(move || {
1216 for _ in 1..max_slots {
1217 for (address, info) in &storage_data {
1218 for (index, value) in info {
1219 let result_storage = backend.do_get_storage(*address, *index);
1220 match result_storage {
1221 Ok(stg_db) => {
1222 assert_eq!(
1223 stg_db, *value,
1224 "Storage in slot number {} in address {} doesn't have the same value", index, address
1225 );
1226
1227 let db_result = {
1228 let storage = db.storage().read();
1229 let address_storage = storage.get(address).unwrap();
1230 *address_storage.get(index).unwrap()
1231 };
1232
1233 assert_eq!(
1234 stg_db, db_result,
1235 "Storage in slot number {} in address {} doesn't have the same value", index, address
1236 );
1237 }
1238
1239 Err(err) => {
1240 panic!("There was a database error: {}", err)
1241 }
1242 }
1243 }
1244 }
1245 }
1246
1247 backend_clone.flush_cache();
1248 });
1249 handle.join().unwrap();
1250
1251 let cache_path =
1254 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1255
1256 let json_db = BlockchainDb::new(meta, Some(cache_path));
1257
1258 let mut storage_data = StorageData::default();
1259 let mut storage_info = StorageInfo::default();
1260 storage_info.insert(U256::from(1), U256::from(10));
1261 storage_info.insert(U256::from(2), U256::from(15));
1262 storage_info.insert(U256::from(3), U256::from(20));
1263 storage_info.insert(U256::from(4), U256::from(20));
1264 storage_info.insert(U256::from(5), U256::from(15));
1265 storage_info.insert(U256::from(6), U256::from(10));
1266
1267 storage_data.insert(address, storage_info);
1268
1269 let max_slots = 5;
1271 let handle = std::thread::spawn(move || {
1272 for _ in 1..max_slots {
1273 for (address, info) in &storage_data {
1274 for (index, value) in info {
1275 let result_storage = {
1276 let storage = json_db.storage().read();
1277 let address_storage = storage.get(address).unwrap().clone();
1278 *address_storage.get(index).unwrap()
1279 };
1280
1281 assert_eq!(
1282 result_storage, *value,
1283 "Storage in slot number {} in address {} doesn't have the same value",
1284 index, address
1285 );
1286 }
1287 }
1288 }
1289 });
1290
1291 handle.join().unwrap();
1292
1293 fs::remove_file("test-data/storage-tmp.json").unwrap();
1295 }
1296
1297 #[tokio::test(flavor = "multi_thread")]
1298 async fn shared_backend_any_request() {
1299 let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1300 let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1301 let endpoint = format!("http://{}", server.server_addr());
1302
1303 let expected_bytes_innner = expected_response_bytes.clone();
1305 let server_handle = std::thread::spawn(move || {
1306 #[derive(Debug, Deserialize)]
1307 struct Request {
1308 method: String,
1309 }
1310 let mut request = server.recv().unwrap();
1311 let rpc_request: Request =
1312 serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1313
1314 match rpc_request.method.as_str() {
1315 "foo_callCustomMethod" => request
1316 .respond(Response::from_string(format!(
1317 r#"{{"result": "{}"}}"#,
1318 alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1319 )))
1320 .unwrap(),
1321 _ => request
1322 .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1323 .unwrap(),
1324 };
1325 });
1326
1327 let provider = get_http_provider(&endpoint);
1328 let meta = BlockchainDbMeta {
1329 cfg_env: Default::default(),
1330 block_env: Default::default(),
1331 hosts: BTreeSet::from([endpoint.to_string()]),
1332 };
1333
1334 let db = BlockchainDb::new(meta, None);
1335 let provider_inner = provider.clone();
1336 let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1337
1338 let actual_response_bytes = backend
1339 .do_any_request(async move {
1340 let bytes: alloy_primitives::Bytes =
1341 provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1342 Ok(bytes)
1343 })
1344 .expect("failed performing any request");
1345
1346 assert_eq!(actual_response_bytes, expected_response_bytes);
1347
1348 server_handle.join().unwrap();
1349 }
1350}