1use crate::{
4 cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5 error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9 network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction},
10 DynProvider, Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16 pin_mut,
17 stream::Stream,
18 task::{Context, Poll},
19 Future, FutureExt,
20};
21use revm::{
22 database::DatabaseRef,
23 primitives::{
24 map::{hash_map::Entry, AddressHashMap, HashMap},
25 KECCAK_EMPTY,
26 },
27 state::{AccountInfo, Bytecode},
28};
29use std::{
30 collections::VecDeque,
31 fmt,
32 future::IntoFuture,
33 path::Path,
34 pin::Pin,
35 sync::{
36 atomic::{AtomicU8, Ordering},
37 mpsc::{channel as oneshot_channel, Sender as OneshotSender},
38 Arc,
39 },
40};
41use tokio::select;
42
43pub const NON_ARCHIVE_NODE_WARNING: &str = "\
45It looks like you're trying to fork from an older block with a non-archive node which is not \
46supported. Please try to change your RPC url to an archive node if the issue persists.";
47
48type AccountFuture<Err> =
51 Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
52type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
53type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
54type FullBlockFuture<Err> = Pin<
55 Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
56>;
57type TransactionFuture<Err> =
58 Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
59
60type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
61type StorageSender = OneshotSender<DatabaseResult<U256>>;
62type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
63type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
64type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
65
66type AddressData = AddressHashMap<AccountInfo>;
67type StorageData = AddressHashMap<StorageInfo>;
68type BlockHashData = HashMap<U256, B256>;
69
70const ACCOUNT_FETCH_UNCHECKED: u8 = 0;
72const ACCOUNT_FETCH_SUPPORTS_ACC_INFO: u8 = 1;
75const ACCOUNT_FETCH_SEPARATE_REQUESTS: u8 = 2;
77
78struct AnyRequestFuture<T, Err> {
79 sender: OneshotSender<Result<T, Err>>,
80 future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
81}
82
83impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
86 }
87}
88
89trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
90 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
91}
92
93impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
102where
103 T: fmt::Debug + Send + 'static,
104 Err: fmt::Debug + Send + 'static,
105{
106 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
107 match self.future.poll_unpin(cx) {
108 Poll::Ready(result) => {
109 let _ = self.sender.send(result);
110 Poll::Ready(())
111 }
112 Poll::Pending => Poll::Pending,
113 }
114 }
115}
116
117enum ProviderRequest<Err> {
119 Account(AccountFuture<Err>),
120 Storage(StorageFuture<Err>),
121 BlockHash(BlockHashFuture<Err>),
122 FullBlock(FullBlockFuture<Err>),
123 Transaction(TransactionFuture<Err>),
124 AnyRequest(Box<dyn WrappedAnyRequest>),
125}
126
127#[derive(Debug)]
129enum BackendRequest {
130 Basic(Address, AccountInfoSender),
132 Storage(Address, U256, StorageSender),
134 BlockHash(u64, BlockHashSender),
136 FullBlock(BlockId, FullBlockSender),
138 Transaction(B256, TransactionSender),
140 SetPinnedBlock(BlockId),
142
143 UpdateAddress(AddressData),
145 UpdateStorage(StorageData),
147 UpdateBlockHash(BlockHashData),
149 AnyRequest(Box<dyn WrappedAnyRequest>),
151}
152
153#[must_use = "futures do nothing unless polled"]
158pub struct BackendHandler {
159 provider: DynProvider<AnyNetwork>,
160 db: BlockchainDb,
162 pending_requests: Vec<ProviderRequest<eyre::Report>>,
164 account_requests: HashMap<Address, Vec<AccountInfoSender>>,
166 storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
168 block_requests: HashMap<u64, Vec<BlockHashSender>>,
170 incoming: UnboundedReceiver<BackendRequest>,
172 queued_requests: VecDeque<BackendRequest>,
174 block_id: Option<BlockId>,
177 account_fetch_mode: Arc<AtomicU8>,
179}
180
181impl BackendHandler {
182 fn new(
183 provider: DynProvider<AnyNetwork>,
184 db: BlockchainDb,
185 rx: UnboundedReceiver<BackendRequest>,
186 block_id: Option<BlockId>,
187 ) -> Self {
188 Self {
189 provider,
190 db,
191 pending_requests: Default::default(),
192 account_requests: Default::default(),
193 storage_requests: Default::default(),
194 block_requests: Default::default(),
195 queued_requests: Default::default(),
196 incoming: rx,
197 block_id,
198 account_fetch_mode: Arc::new(AtomicU8::new(ACCOUNT_FETCH_UNCHECKED)),
199 }
200 }
201
202 fn on_request(&mut self, req: BackendRequest) {
209 match req {
210 BackendRequest::Basic(addr, sender) => {
211 trace!(target: "backendhandler", "received request basic address={:?}", addr);
212 let acc = self.db.accounts().read().get(&addr).cloned();
213 if let Some(basic) = acc {
214 let _ = sender.send(Ok(basic));
215 } else {
216 self.request_account(addr, sender);
217 }
218 }
219 BackendRequest::BlockHash(number, sender) => {
220 let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
221 if let Some(hash) = hash {
222 let _ = sender.send(Ok(hash));
223 } else {
224 self.request_hash(number, sender);
225 }
226 }
227 BackendRequest::FullBlock(number, sender) => {
228 self.request_full_block(number, sender);
229 }
230 BackendRequest::Transaction(tx, sender) => {
231 self.request_transaction(tx, sender);
232 }
233 BackendRequest::Storage(addr, idx, sender) => {
234 let value =
236 self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
237 if let Some(value) = value {
238 let _ = sender.send(Ok(value));
239 } else {
240 self.request_account_storage(addr, idx, sender);
242 }
243 }
244 BackendRequest::SetPinnedBlock(block_id) => {
245 self.block_id = Some(block_id);
246 }
247 BackendRequest::UpdateAddress(address_data) => {
248 for (address, data) in address_data {
249 self.db.accounts().write().insert(address, data);
250 }
251 }
252 BackendRequest::UpdateStorage(storage_data) => {
253 for (address, data) in storage_data {
254 self.db.storage().write().insert(address, data);
255 }
256 }
257 BackendRequest::UpdateBlockHash(block_hash_data) => {
258 for (block, hash) in block_hash_data {
259 self.db.block_hashes().write().insert(block, hash);
260 }
261 }
262 BackendRequest::AnyRequest(fut) => {
263 self.pending_requests.push(ProviderRequest::AnyRequest(fut));
264 }
265 }
266 }
267
268 fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
270 match self.storage_requests.entry((address, idx)) {
271 Entry::Occupied(mut entry) => {
272 entry.get_mut().push(listener);
273 }
274 Entry::Vacant(entry) => {
275 trace!(target: "backendhandler", %address, %idx, "preparing storage request");
276 entry.insert(vec![listener]);
277 let provider = self.provider.clone();
278 let block_id = self.block_id.unwrap_or_default();
279 let fut = Box::pin(async move {
280 let storage = provider
281 .get_storage_at(address, idx)
282 .block_id(block_id)
283 .await
284 .map_err(Into::into);
285 (storage, address, idx)
286 });
287 self.pending_requests.push(ProviderRequest::Storage(fut));
288 }
289 }
290 }
291
292 fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
294 trace!(target: "backendhandler", "preparing account request, address={:?}", address);
295
296 let provider = self.provider.clone();
297 let block_id = self.block_id.unwrap_or_default();
298 let mode = Arc::clone(&self.account_fetch_mode);
299 let fut = async move {
300 let initial_mode = mode.load(Ordering::Relaxed);
302 match initial_mode {
303 ACCOUNT_FETCH_UNCHECKED => {
304 let acc_info_fut =
306 provider.get_account_info(address).block_id(block_id).into_future();
307
308 let balance_fut =
310 provider.get_balance(address).block_id(block_id).into_future();
311 let nonce_fut =
312 provider.get_transaction_count(address).block_id(block_id).into_future();
313 let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
314 let triple_fut = futures::future::try_join3(balance_fut, nonce_fut, code_fut);
315 pin_mut!(acc_info_fut, triple_fut);
316
317 select! {
318 acc_info = &mut acc_info_fut => {
319 match acc_info {
320 Ok(info) => {
321 trace!(target: "backendhandler", "endpoint supports eth_getAccountInfo");
322 mode.store(ACCOUNT_FETCH_SUPPORTS_ACC_INFO, Ordering::Relaxed);
323 Ok((info.balance, info.nonce, info.code))
324 }
325 Err(err) => {
326 trace!(target: "backendhandler", ?err, "failed initial eth_getAccountInfo call");
327 mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
328 Ok(triple_fut.await?)
329 }
330 }
331 }
332 triple = &mut triple_fut => {
333 match triple {
334 Ok((balance, nonce, code)) => {
335 mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
336 Ok((balance, nonce, code))
337 }
338 Err(err) => Err(err.into())
339 }
340 }
341 }
342 }
343
344 ACCOUNT_FETCH_SUPPORTS_ACC_INFO => {
345 let mut res = provider
346 .get_account_info(address)
347 .block_id(block_id)
348 .into_future()
349 .await
350 .map(|info| (info.balance, info.nonce, info.code));
351
352 if res.is_err() {
355 mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed);
356
357 let balance_fut =
358 provider.get_balance(address).block_id(block_id).into_future();
359 let nonce_fut = provider
360 .get_transaction_count(address)
361 .block_id(block_id)
362 .into_future();
363 let code_fut =
364 provider.get_code_at(address).block_id(block_id).into_future();
365 res = futures::future::try_join3(balance_fut, nonce_fut, code_fut).await;
366 }
367
368 Ok(res?)
369 }
370
371 ACCOUNT_FETCH_SEPARATE_REQUESTS => {
372 let balance_fut =
373 provider.get_balance(address).block_id(block_id).into_future();
374 let nonce_fut =
375 provider.get_transaction_count(address).block_id(block_id).into_future();
376 let code_fut = provider.get_code_at(address).block_id(block_id).into_future();
377
378 Ok(futures::future::try_join3(balance_fut, nonce_fut, code_fut).await?)
379 }
380
381 _ => unreachable!("Invalid account fetch mode"),
382 }
383 };
384
385 ProviderRequest::Account(Box::pin(async move {
386 let result = fut.await;
387 (result, address)
388 }))
389 }
390
391 fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
393 match self.account_requests.entry(address) {
394 Entry::Occupied(mut entry) => {
395 entry.get_mut().push(listener);
396 }
397 Entry::Vacant(entry) => {
398 entry.insert(vec![listener]);
399 self.pending_requests.push(self.get_account_req(address));
400 }
401 }
402 }
403
404 fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
406 let provider = self.provider.clone();
407 let fut = Box::pin(async move {
408 let block = provider
409 .get_block(number)
410 .full()
411 .await
412 .wrap_err(format!("could not fetch block {number:?}"));
413 (sender, block, number)
414 });
415
416 self.pending_requests.push(ProviderRequest::FullBlock(fut));
417 }
418
419 fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
421 let provider = self.provider.clone();
422 let fut = Box::pin(async move {
423 let block = provider
424 .get_transaction_by_hash(tx)
425 .await
426 .wrap_err_with(|| format!("could not get transaction {tx}"))
427 .and_then(|maybe| {
428 maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
429 });
430 (sender, block, tx)
431 });
432
433 self.pending_requests.push(ProviderRequest::Transaction(fut));
434 }
435
436 fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
438 match self.block_requests.entry(number) {
439 Entry::Occupied(mut entry) => {
440 entry.get_mut().push(listener);
441 }
442 Entry::Vacant(entry) => {
443 trace!(target: "backendhandler", number, "preparing block hash request");
444 entry.insert(vec![listener]);
445 let provider = self.provider.clone();
446 let fut = Box::pin(async move {
447 let block = provider
448 .get_block_by_number(number.into())
449 .hashes()
450 .await
451 .wrap_err("failed to get block");
452
453 let block_hash = match block {
454 Ok(Some(block)) => Ok(block.header.hash),
455 Ok(None) => {
456 warn!(target: "backendhandler", ?number, "block not found");
457 Ok(KECCAK_EMPTY)
460 }
461 Err(err) => {
462 error!(target: "backendhandler", %err, ?number, "failed to get block");
463 Err(err)
464 }
465 };
466 (block_hash, number)
467 });
468 self.pending_requests.push(ProviderRequest::BlockHash(fut));
469 }
470 }
471 }
472}
473
474impl Future for BackendHandler {
475 type Output = ();
476
477 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
478 let pin = self.get_mut();
479 loop {
480 while let Some(req) = pin.queued_requests.pop_front() {
482 pin.on_request(req)
483 }
484
485 loop {
487 match Pin::new(&mut pin.incoming).poll_next(cx) {
488 Poll::Ready(Some(req)) => {
489 pin.queued_requests.push_back(req);
490 }
491 Poll::Ready(None) => {
492 trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
493 return Poll::Ready(());
494 }
495 Poll::Pending => break,
496 }
497 }
498
499 for n in (0..pin.pending_requests.len()).rev() {
501 let mut request = pin.pending_requests.swap_remove(n);
502 match &mut request {
503 ProviderRequest::Account(fut) => {
504 if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
505 let (balance, nonce, code) = match resp {
507 Ok(res) => res,
508 Err(err) => {
509 let err = Arc::new(err);
510 if let Some(listeners) = pin.account_requests.remove(&addr) {
511 listeners.into_iter().for_each(|l| {
512 let _ = l.send(Err(DatabaseError::GetAccount(
513 addr,
514 Arc::clone(&err),
515 )));
516 })
517 }
518 continue;
519 }
520 };
521
522 let (code, code_hash) = if !code.is_empty() {
524 (code.clone(), keccak256(&code))
525 } else {
526 (Bytes::default(), KECCAK_EMPTY)
527 };
528
529 let acc = AccountInfo {
531 nonce,
532 balance,
533 code: Some(Bytecode::new_raw(code)),
534 code_hash,
535 account_id: None,
536 };
537 pin.db.accounts().write().insert(addr, acc.clone());
538
539 if let Some(listeners) = pin.account_requests.remove(&addr) {
541 listeners.into_iter().for_each(|l| {
542 let _ = l.send(Ok(acc.clone()));
543 })
544 }
545 continue;
546 }
547 }
548 ProviderRequest::Storage(fut) => {
549 if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
550 let value = match resp {
551 Ok(value) => value,
552 Err(err) => {
553 let err = Arc::new(err);
555 if let Some(listeners) =
556 pin.storage_requests.remove(&(addr, idx))
557 {
558 listeners.into_iter().for_each(|l| {
559 let _ = l.send(Err(DatabaseError::GetStorage(
560 addr,
561 idx,
562 Arc::clone(&err),
563 )));
564 })
565 }
566 continue;
567 }
568 };
569
570 pin.db.storage().write().entry(addr).or_default().insert(idx, value);
572
573 if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
575 listeners.into_iter().for_each(|l| {
576 let _ = l.send(Ok(value));
577 })
578 }
579 continue;
580 }
581 }
582 ProviderRequest::BlockHash(fut) => {
583 if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
584 let value = match block_hash {
585 Ok(value) => value,
586 Err(err) => {
587 let err = Arc::new(err);
588 if let Some(listeners) = pin.block_requests.remove(&number) {
590 listeners.into_iter().for_each(|l| {
591 let _ = l.send(Err(DatabaseError::GetBlockHash(
592 number,
593 Arc::clone(&err),
594 )));
595 })
596 }
597 continue;
598 }
599 };
600
601 pin.db.block_hashes().write().insert(U256::from(number), value);
603
604 if let Some(listeners) = pin.block_requests.remove(&number) {
606 listeners.into_iter().for_each(|l| {
607 let _ = l.send(Ok(value));
608 })
609 }
610 continue;
611 }
612 }
613 ProviderRequest::FullBlock(fut) => {
614 if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
615 let msg = match resp {
616 Ok(Some(block)) => Ok(block),
617 Ok(None) => Err(DatabaseError::BlockNotFound(number)),
618 Err(err) => {
619 let err = Arc::new(err);
620 Err(DatabaseError::GetFullBlock(number, err))
621 }
622 };
623 let _ = sender.send(msg);
624 continue;
625 }
626 }
627 ProviderRequest::Transaction(fut) => {
628 if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
629 let msg = match tx {
630 Ok(tx) => Ok(tx),
631 Err(err) => {
632 let err = Arc::new(err);
633 Err(DatabaseError::GetTransaction(tx_hash, err))
634 }
635 };
636 let _ = sender.send(msg);
637 continue;
638 }
639 }
640 ProviderRequest::AnyRequest(fut) => {
641 if fut.poll_inner(cx).is_ready() {
642 continue;
643 }
644 }
645 }
646 pin.pending_requests.push(request);
648 }
649
650 if pin.queued_requests.is_empty() {
653 return Poll::Pending;
654 }
655 }
656 }
657}
658
659#[derive(Default, Clone, Debug, PartialEq)]
662pub enum BlockingMode {
663 #[default]
667 BlockInPlace,
668 Block,
672}
673
674impl BlockingMode {
675 pub fn run<F, R>(&self, f: F) -> R
677 where
678 F: FnOnce() -> R,
679 {
680 match self {
681 Self::BlockInPlace => tokio::task::block_in_place(f),
682 Self::Block => f(),
683 }
684 }
685}
686
687#[derive(Clone, Debug)]
716pub struct SharedBackend {
717 backend: UnboundedSender<BackendRequest>,
719 cache: Arc<FlushJsonBlockCacheDB>,
724
725 blocking_mode: BlockingMode,
727}
728
729impl SharedBackend {
730 pub async fn spawn_backend<P: Provider<AnyNetwork> + 'static>(
736 provider: P,
737 db: BlockchainDb,
738 pin_block: Option<BlockId>,
739 ) -> Self {
740 let (shared, handler) = Self::new(provider, db, pin_block);
741 trace!(target: "backendhandler", "spawning Backendhandler task");
743 tokio::spawn(handler);
744 shared
745 }
746
747 pub fn spawn_backend_thread<P: Provider<AnyNetwork> + 'static>(
750 provider: P,
751 db: BlockchainDb,
752 pin_block: Option<BlockId>,
753 ) -> Self {
754 let (shared, handler) = Self::new(provider, db, pin_block);
755
756 std::thread::Builder::new()
759 .name("fork-backend".into())
760 .spawn(move || {
761 let rt = tokio::runtime::Builder::new_current_thread()
762 .enable_all()
763 .build()
764 .expect("failed to build tokio runtime");
765
766 rt.block_on(handler);
767 })
768 .expect("failed to spawn thread");
769 trace!(target: "backendhandler", "spawned Backendhandler thread");
770
771 shared
772 }
773
774 pub fn new<P: Provider<AnyNetwork> + 'static>(
776 provider: P,
777 db: BlockchainDb,
778 pin_block: Option<BlockId>,
779 ) -> (Self, BackendHandler) {
780 let (backend, backend_rx) = unbounded();
781 let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
782 let handler = BackendHandler::new(provider.erased(), db, backend_rx, pin_block);
783 (Self { backend, cache, blocking_mode: Default::default() }, handler)
784 }
785
786 pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
788 Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
789 }
790
791 pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
793 let req = BackendRequest::SetPinnedBlock(block.into());
794 self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
795 }
796
797 pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
799 self.blocking_mode.run(|| {
800 let (sender, rx) = oneshot_channel();
801 let req = BackendRequest::FullBlock(block.into(), sender);
802 self.backend.unbounded_send(req)?;
803 rx.recv()?
804 })
805 }
806
807 pub fn get_transaction(&self, tx: B256) -> DatabaseResult<AnyRpcTransaction> {
809 self.blocking_mode.run(|| {
810 let (sender, rx) = oneshot_channel();
811 let req = BackendRequest::Transaction(tx, sender);
812 self.backend.unbounded_send(req)?;
813 rx.recv()?
814 })
815 }
816
817 fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
818 self.blocking_mode.run(|| {
819 let (sender, rx) = oneshot_channel();
820 let req = BackendRequest::Basic(address, sender);
821 self.backend.unbounded_send(req)?;
822 rx.recv()?.map(Some)
823 })
824 }
825
826 fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
827 self.blocking_mode.run(|| {
828 let (sender, rx) = oneshot_channel();
829 let req = BackendRequest::Storage(address, index, sender);
830 self.backend.unbounded_send(req)?;
831 rx.recv()?
832 })
833 }
834
835 fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
836 self.blocking_mode.run(|| {
837 let (sender, rx) = oneshot_channel();
838 let req = BackendRequest::BlockHash(number, sender);
839 self.backend.unbounded_send(req)?;
840 rx.recv()?
841 })
842 }
843
844 pub fn insert_or_update_address(&self, address_data: AddressData) {
846 let req = BackendRequest::UpdateAddress(address_data);
847 let err = self.backend.unbounded_send(req);
848 match err {
849 Ok(_) => (),
850 Err(e) => {
851 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
852 }
853 }
854 }
855
856 pub fn insert_or_update_storage(&self, storage_data: StorageData) {
858 let req = BackendRequest::UpdateStorage(storage_data);
859 let err = self.backend.unbounded_send(req);
860 match err {
861 Ok(_) => (),
862 Err(e) => {
863 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
864 }
865 }
866 }
867
868 pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
870 let req = BackendRequest::UpdateBlockHash(block_hash_data);
871 let err = self.backend.unbounded_send(req);
872 match err {
873 Ok(_) => (),
874 Err(e) => {
875 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
876 }
877 }
878 }
879
880 pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
882 where
883 F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
884 T: fmt::Debug + Send + 'static,
885 {
886 self.blocking_mode.run(|| {
887 let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
888 let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
889 sender,
890 future: Box::pin(fut),
891 }));
892 self.backend.unbounded_send(req)?;
893 rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
894 })
895 }
896
897 pub fn flush_cache(&self) {
899 self.cache.0.flush();
900 }
901
902 pub fn flush_cache_to(&self, cache_path: &Path) {
904 self.cache.0.flush_to(cache_path);
905 }
906
907 pub fn data(&self) -> Arc<MemDb> {
909 self.cache.0.db().clone()
910 }
911
912 pub fn accounts(&self) -> AddressData {
914 self.cache.0.db().accounts.read().clone()
915 }
916
917 pub fn accounts_len(&self) -> usize {
919 self.cache.0.db().accounts.read().len()
920 }
921
922 pub fn storage(&self) -> StorageData {
924 self.cache.0.db().storage.read().clone()
925 }
926
927 pub fn storage_len(&self) -> usize {
929 self.cache.0.db().storage.read().len()
930 }
931
932 pub fn block_hashes(&self) -> BlockHashData {
934 self.cache.0.db().block_hashes.read().clone()
935 }
936
937 pub fn block_hashes_len(&self) -> usize {
939 self.cache.0.db().block_hashes.read().len()
940 }
941}
942
943impl DatabaseRef for SharedBackend {
944 type Error = DatabaseError;
945
946 fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
947 trace!(target: "sharedbackend", %address, "request basic");
948 self.do_get_basic(address).inspect_err(|err| {
949 error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
950 if err.is_possibly_non_archive_node_error() {
951 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
952 }
953 })
954 }
955
956 fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
957 Err(DatabaseError::MissingCode(hash))
958 }
959
960 fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
961 trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
962 self.do_get_storage(address, index).inspect_err(|err| {
963 error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
964 if err.is_possibly_non_archive_node_error() {
965 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
966 }
967 })
968 }
969
970 fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
971 trace!(target: "sharedbackend", "request block hash for number {:?}", number);
972 self.do_get_block_hash(number).inspect_err(|err| {
973 error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
974 if err.is_possibly_non_archive_node_error() {
975 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
976 }
977 })
978 }
979}
980
981#[cfg(test)]
982mod tests {
983 use super::*;
984 use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
985 use alloy_consensus::BlockHeader;
986 use alloy_provider::ProviderBuilder;
987 use alloy_rpc_client::ClientBuilder;
988 use serde::Deserialize;
989 use std::{fs, path::PathBuf};
990 use tiny_http::{Response, Server};
991
992 pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
993 ProviderBuilder::new()
994 .network::<AnyNetwork>()
995 .connect_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
996 }
997
998 const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
999
1000 #[tokio::test(flavor = "multi_thread")]
1001 async fn test_builder() {
1002 let Some(endpoint) = ENDPOINT else { return };
1003 let provider = get_http_provider(endpoint);
1004
1005 let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
1006 let meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
1007
1008 assert_eq!(meta.block_env.number, U256::from(any_rpc_block.header.number()));
1009 }
1010
1011 #[tokio::test(flavor = "multi_thread")]
1012 async fn shared_backend() {
1013 let Some(endpoint) = ENDPOINT else { return };
1014
1015 let provider = get_http_provider(endpoint);
1016 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1017
1018 let db = BlockchainDb::new(meta, None);
1019 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1020
1021 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1023
1024 let idx = U256::from(0u64);
1025 let value = backend.storage_ref(address, idx).unwrap();
1026 let account = backend.basic_ref(address).unwrap().unwrap();
1027
1028 let mem_acc = db.accounts().read().get(&address).unwrap().clone();
1029 assert_eq!(account.balance, mem_acc.balance);
1030 assert_eq!(account.nonce, mem_acc.nonce);
1031 let slots = db.storage().read().get(&address).unwrap().clone();
1032 assert_eq!(slots.len(), 1);
1033 assert_eq!(slots.get(&idx).copied().unwrap(), value);
1034
1035 let num = 10u64;
1036 let hash = backend.block_hash_ref(num).unwrap();
1037 let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
1038 assert_eq!(hash, mem_hash);
1039
1040 let max_slots = 5;
1041 let handle = std::thread::spawn(move || {
1042 for i in 1..max_slots {
1043 let idx = U256::from(i);
1044 let _ = backend.storage_ref(address, idx);
1045 }
1046 });
1047 handle.join().unwrap();
1048 let slots = db.storage().read().get(&address).unwrap().clone();
1049 assert_eq!(slots.len() as u64, max_slots);
1050 }
1051
1052 #[test]
1053 fn can_read_cache() {
1054 let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
1055 let json = JsonBlockCacheDB::load(cache_path).unwrap();
1056 assert!(!json.db().accounts.read().is_empty());
1057 }
1058
1059 #[tokio::test(flavor = "multi_thread")]
1060 async fn can_modify_address() {
1061 let Some(endpoint) = ENDPOINT else { return };
1062
1063 let provider = get_http_provider(endpoint);
1064 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1065
1066 let db = BlockchainDb::new(meta, None);
1067 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1068
1069 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1071
1072 let new_acc = AccountInfo {
1073 nonce: 1000u64,
1074 balance: U256::from(2000),
1075 code: None,
1076 code_hash: KECCAK_EMPTY,
1077 account_id: None,
1078 };
1079 let mut account_data = AddressData::default();
1080 account_data.insert(address, new_acc.clone());
1081
1082 backend.insert_or_update_address(account_data);
1083
1084 let max_slots = 5;
1085 let handle = std::thread::spawn(move || {
1086 for i in 1..max_slots {
1087 let idx = U256::from(i);
1088 let result_address = backend.basic_ref(address).unwrap();
1089 match result_address {
1090 Some(acc) => {
1091 assert_eq!(
1092 acc.nonce, new_acc.nonce,
1093 "The nonce was not changed in instance of index {idx}"
1094 );
1095 assert_eq!(
1096 acc.balance, new_acc.balance,
1097 "The balance was not changed in instance of index {idx}"
1098 );
1099
1100 let db_address = {
1102 let accounts = db.accounts().read();
1103 accounts.get(&address).unwrap().clone()
1104 };
1105
1106 assert_eq!(
1107 db_address.nonce, new_acc.nonce,
1108 "The nonce was not changed in instance of index {idx}"
1109 );
1110 assert_eq!(
1111 db_address.balance, new_acc.balance,
1112 "The balance was not changed in instance of index {idx}"
1113 );
1114 }
1115 None => panic!("Account not found"),
1116 }
1117 }
1118 });
1119 handle.join().unwrap();
1120 }
1121
1122 #[tokio::test(flavor = "multi_thread")]
1123 async fn can_modify_storage() {
1124 let Some(endpoint) = ENDPOINT else { return };
1125
1126 let provider = get_http_provider(endpoint);
1127 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1128
1129 let db = BlockchainDb::new(meta, None);
1130 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1131
1132 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1134
1135 let mut storage_data = StorageData::default();
1136 let mut storage_info = StorageInfo::default();
1137 storage_info.insert(U256::from(20), U256::from(10));
1138 storage_info.insert(U256::from(30), U256::from(15));
1139 storage_info.insert(U256::from(40), U256::from(20));
1140
1141 storage_data.insert(address, storage_info);
1142
1143 backend.insert_or_update_storage(storage_data.clone());
1144
1145 let max_slots = 5;
1146 let handle = std::thread::spawn(move || {
1147 for _ in 1..max_slots {
1148 for (address, info) in &storage_data {
1149 for (index, value) in info {
1150 let result_storage = backend.do_get_storage(*address, *index);
1151 match result_storage {
1152 Ok(stg_db) => {
1153 assert_eq!(
1154 stg_db, *value,
1155 "Storage in slot number {index} in address {address} do not have the same value"
1156 );
1157
1158 let db_result = {
1159 let storage = db.storage().read();
1160 let address_storage = storage.get(address).unwrap();
1161 *address_storage.get(index).unwrap()
1162 };
1163
1164 assert_eq!(
1165 stg_db, db_result,
1166 "Storage in slot number {index} in address {address} do not have the same value"
1167 )
1168 }
1169
1170 Err(err) => {
1171 panic!("There was a database error: {err}")
1172 }
1173 }
1174 }
1175 }
1176 }
1177 });
1178 handle.join().unwrap();
1179 }
1180
1181 #[tokio::test(flavor = "multi_thread")]
1182 async fn can_modify_block_hashes() {
1183 let Some(endpoint) = ENDPOINT else { return };
1184
1185 let provider = get_http_provider(endpoint);
1186 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1187
1188 let db = BlockchainDb::new(meta, None);
1189 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1190
1191 let mut block_hash_data = BlockHashData::default();
1195 block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1196 block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1197 block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1198 block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1199 block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1200
1201 backend.insert_or_update_block_hashes(block_hash_data.clone());
1202
1203 let max_slots: u64 = 5;
1204 let handle = std::thread::spawn(move || {
1205 for i in 1..max_slots {
1206 let key = U256::from(i);
1207 let result_hash = backend.do_get_block_hash(i);
1208 match result_hash {
1209 Ok(hash) => {
1210 assert_eq!(
1211 hash,
1212 *block_hash_data.get(&key).unwrap(),
1213 "The hash in block {key} did not match"
1214 );
1215
1216 let db_result = {
1217 let hashes = db.block_hashes().read();
1218 *hashes.get(&key).unwrap()
1219 };
1220
1221 assert_eq!(hash, db_result, "The hash in block {key} did not match");
1222 }
1223 Err(err) => panic!("Hash not found, error: {err}"),
1224 }
1225 }
1226 });
1227 handle.join().unwrap();
1228 }
1229
1230 #[tokio::test(flavor = "multi_thread")]
1231 async fn can_modify_storage_with_cache() {
1232 let Some(endpoint) = ENDPOINT else { return };
1233
1234 let provider = get_http_provider(endpoint);
1235 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1236
1237 fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1239
1240 let cache_path =
1241 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1242
1243 let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1244 let backend =
1245 SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1246
1247 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1249
1250 let mut storage_data = StorageData::default();
1251 let mut storage_info = StorageInfo::default();
1252 storage_info.insert(U256::from(1), U256::from(10));
1253 storage_info.insert(U256::from(2), U256::from(15));
1254 storage_info.insert(U256::from(3), U256::from(20));
1255 storage_info.insert(U256::from(4), U256::from(20));
1256 storage_info.insert(U256::from(5), U256::from(15));
1257 storage_info.insert(U256::from(6), U256::from(10));
1258
1259 let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1260 address_data.code = None;
1261
1262 storage_data.insert(address, storage_info);
1263
1264 backend.insert_or_update_storage(storage_data.clone());
1265
1266 let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1267 new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1269
1270 let mut account_data = AddressData::default();
1271 account_data.insert(address, new_acc.clone());
1272
1273 backend.insert_or_update_address(account_data);
1274
1275 let backend_clone = backend.clone();
1276
1277 let max_slots = 5;
1278 let handle = std::thread::spawn(move || {
1279 for _ in 1..max_slots {
1280 for (address, info) in &storage_data {
1281 for (index, value) in info {
1282 let result_storage = backend.do_get_storage(*address, *index);
1283 match result_storage {
1284 Ok(stg_db) => {
1285 assert_eq!(
1286 stg_db, *value,
1287 "Storage in slot number {index} in address {address} doesn't have the same value"
1288 );
1289
1290 let db_result = {
1291 let storage = db.storage().read();
1292 let address_storage = storage.get(address).unwrap();
1293 *address_storage.get(index).unwrap()
1294 };
1295
1296 assert_eq!(
1297 stg_db, db_result,
1298 "Storage in slot number {index} in address {address} doesn't have the same value"
1299 );
1300 }
1301
1302 Err(err) => {
1303 panic!("There was a database error: {err}")
1304 }
1305 }
1306 }
1307 }
1308 }
1309
1310 backend_clone.flush_cache();
1311 });
1312 handle.join().unwrap();
1313
1314 let cache_path =
1317 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1318
1319 let json_db = BlockchainDb::new(meta, Some(cache_path));
1320
1321 let mut storage_data = StorageData::default();
1322 let mut storage_info = StorageInfo::default();
1323 storage_info.insert(U256::from(1), U256::from(10));
1324 storage_info.insert(U256::from(2), U256::from(15));
1325 storage_info.insert(U256::from(3), U256::from(20));
1326 storage_info.insert(U256::from(4), U256::from(20));
1327 storage_info.insert(U256::from(5), U256::from(15));
1328 storage_info.insert(U256::from(6), U256::from(10));
1329
1330 storage_data.insert(address, storage_info);
1331
1332 let max_slots = 5;
1334 let handle = std::thread::spawn(move || {
1335 for _ in 1..max_slots {
1336 for (address, info) in &storage_data {
1337 for (index, value) in info {
1338 let result_storage = {
1339 let storage = json_db.storage().read();
1340 let address_storage = storage.get(address).unwrap().clone();
1341 *address_storage.get(index).unwrap()
1342 };
1343
1344 assert_eq!(
1345 result_storage, *value,
1346 "Storage in slot number {index} in address {address} doesn't have the same value"
1347 );
1348 }
1349 }
1350 }
1351 });
1352
1353 handle.join().unwrap();
1354
1355 fs::remove_file("test-data/storage-tmp.json").unwrap();
1357 }
1358
1359 #[tokio::test(flavor = "multi_thread")]
1360 async fn shared_backend_any_request() {
1361 let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1362 let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1363 let endpoint = format!("http://{}", server.server_addr());
1364
1365 let expected_bytes_innner = expected_response_bytes.clone();
1367 let server_handle = std::thread::spawn(move || {
1368 #[derive(Debug, Deserialize)]
1369 struct Request {
1370 method: String,
1371 }
1372 let mut request = server.recv().unwrap();
1373 let rpc_request: Request =
1374 serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1375
1376 match rpc_request.method.as_str() {
1377 "foo_callCustomMethod" => request
1378 .respond(Response::from_string(format!(
1379 r#"{{"result": "{}"}}"#,
1380 alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1381 )))
1382 .unwrap(),
1383 _ => request
1384 .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1385 .unwrap(),
1386 };
1387 });
1388
1389 let provider = get_http_provider(&endpoint);
1390 let meta = BlockchainDbMeta::new(Default::default(), endpoint.to_string());
1391
1392 let db = BlockchainDb::new(meta, None);
1393 let provider_inner = provider.clone();
1394 let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1395
1396 let actual_response_bytes = backend
1397 .do_any_request(async move {
1398 let bytes: alloy_primitives::Bytes =
1399 provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1400 Ok(bytes)
1401 })
1402 .expect("failed performing any request");
1403
1404 assert_eq!(actual_response_bytes, expected_response_bytes);
1405
1406 server_handle.join().unwrap();
1407 }
1408}