1use crate::{
4 cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
5 error::{DatabaseError, DatabaseResult},
6};
7use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
8use alloy_provider::{
9 network::{AnyNetwork, AnyRpcBlock, AnyRpcTransaction},
10 Provider,
11};
12use alloy_rpc_types::BlockId;
13use eyre::WrapErr;
14use futures::{
15 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
16 stream::Stream,
17 task::{Context, Poll},
18 Future, FutureExt,
19};
20use revm::{
21 database::DatabaseRef,
22 primitives::{
23 map::{hash_map::Entry, AddressHashMap, HashMap},
24 KECCAK_EMPTY,
25 },
26 state::{AccountInfo, Bytecode},
27};
28use std::{
29 collections::VecDeque,
30 fmt,
31 future::IntoFuture,
32 path::Path,
33 pin::Pin,
34 sync::{
35 mpsc::{channel as oneshot_channel, Sender as OneshotSender},
36 Arc,
37 },
38};
39
40pub const NON_ARCHIVE_NODE_WARNING: &str = "\
42It looks like you're trying to fork from an older block with a non-archive node which is not \
43supported. Please try to change your RPC url to an archive node if the issue persists.";
44
45type AccountFuture<Err> =
48 Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
49type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
50type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
51type FullBlockFuture<Err> = Pin<
52 Box<dyn Future<Output = (FullBlockSender, Result<Option<AnyRpcBlock>, Err>, BlockId)> + Send>,
53>;
54type TransactionFuture<Err> =
55 Pin<Box<dyn Future<Output = (TransactionSender, Result<AnyRpcTransaction, Err>, B256)> + Send>>;
56
57type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
58type StorageSender = OneshotSender<DatabaseResult<U256>>;
59type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
60type FullBlockSender = OneshotSender<DatabaseResult<AnyRpcBlock>>;
61type TransactionSender = OneshotSender<DatabaseResult<AnyRpcTransaction>>;
62
63type AddressData = AddressHashMap<AccountInfo>;
64type StorageData = AddressHashMap<StorageInfo>;
65type BlockHashData = HashMap<U256, B256>;
66
67struct AnyRequestFuture<T, Err> {
68 sender: OneshotSender<Result<T, Err>>,
69 future: Pin<Box<dyn Future<Output = Result<T, Err>> + Send>>,
70}
71
72impl<T, Err> fmt::Debug for AnyRequestFuture<T, Err> {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_tuple("AnyRequestFuture").field(&self.sender).finish()
75 }
76}
77
78trait WrappedAnyRequest: Unpin + Send + fmt::Debug {
79 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()>;
80}
81
82impl<T, Err> WrappedAnyRequest for AnyRequestFuture<T, Err>
91where
92 T: fmt::Debug + Send + 'static,
93 Err: fmt::Debug + Send + 'static,
94{
95 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<()> {
96 match self.future.poll_unpin(cx) {
97 Poll::Ready(result) => {
98 let _ = self.sender.send(result);
99 Poll::Ready(())
100 }
101 Poll::Pending => Poll::Pending,
102 }
103 }
104}
105
106enum ProviderRequest<Err> {
108 Account(AccountFuture<Err>),
109 Storage(StorageFuture<Err>),
110 BlockHash(BlockHashFuture<Err>),
111 FullBlock(FullBlockFuture<Err>),
112 Transaction(TransactionFuture<Err>),
113 AnyRequest(Box<dyn WrappedAnyRequest>),
114}
115
116#[derive(Debug)]
118enum BackendRequest {
119 Basic(Address, AccountInfoSender),
121 Storage(Address, U256, StorageSender),
123 BlockHash(u64, BlockHashSender),
125 FullBlock(BlockId, FullBlockSender),
127 Transaction(B256, TransactionSender),
129 SetPinnedBlock(BlockId),
131
132 UpdateAddress(AddressData),
134 UpdateStorage(StorageData),
136 UpdateBlockHash(BlockHashData),
138 AnyRequest(Box<dyn WrappedAnyRequest>),
140}
141
142#[must_use = "futures do nothing unless polled"]
147pub struct BackendHandler<P> {
148 provider: P,
149 db: BlockchainDb,
151 pending_requests: Vec<ProviderRequest<eyre::Report>>,
153 account_requests: HashMap<Address, Vec<AccountInfoSender>>,
155 storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
157 block_requests: HashMap<u64, Vec<BlockHashSender>>,
159 incoming: UnboundedReceiver<BackendRequest>,
161 queued_requests: VecDeque<BackendRequest>,
163 block_id: Option<BlockId>,
166}
167
168impl<P> BackendHandler<P>
169where
170 P: Provider<AnyNetwork> + Clone + Unpin + 'static,
171{
172 fn new(
173 provider: P,
174 db: BlockchainDb,
175 rx: UnboundedReceiver<BackendRequest>,
176 block_id: Option<BlockId>,
177 ) -> Self {
178 Self {
179 provider,
180 db,
181 pending_requests: Default::default(),
182 account_requests: Default::default(),
183 storage_requests: Default::default(),
184 block_requests: Default::default(),
185 queued_requests: Default::default(),
186 incoming: rx,
187 block_id,
188 }
189 }
190
191 fn on_request(&mut self, req: BackendRequest) {
198 match req {
199 BackendRequest::Basic(addr, sender) => {
200 trace!(target: "backendhandler", "received request basic address={:?}", addr);
201 let acc = self.db.accounts().read().get(&addr).cloned();
202 if let Some(basic) = acc {
203 let _ = sender.send(Ok(basic));
204 } else {
205 self.request_account(addr, sender);
206 }
207 }
208 BackendRequest::BlockHash(number, sender) => {
209 let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
210 if let Some(hash) = hash {
211 let _ = sender.send(Ok(hash));
212 } else {
213 self.request_hash(number, sender);
214 }
215 }
216 BackendRequest::FullBlock(number, sender) => {
217 self.request_full_block(number, sender);
218 }
219 BackendRequest::Transaction(tx, sender) => {
220 self.request_transaction(tx, sender);
221 }
222 BackendRequest::Storage(addr, idx, sender) => {
223 let value =
225 self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
226 if let Some(value) = value {
227 let _ = sender.send(Ok(value));
228 } else {
229 self.request_account_storage(addr, idx, sender);
231 }
232 }
233 BackendRequest::SetPinnedBlock(block_id) => {
234 self.block_id = Some(block_id);
235 }
236 BackendRequest::UpdateAddress(address_data) => {
237 for (address, data) in address_data {
238 self.db.accounts().write().insert(address, data);
239 }
240 }
241 BackendRequest::UpdateStorage(storage_data) => {
242 for (address, data) in storage_data {
243 self.db.storage().write().insert(address, data);
244 }
245 }
246 BackendRequest::UpdateBlockHash(block_hash_data) => {
247 for (block, hash) in block_hash_data {
248 self.db.block_hashes().write().insert(block, hash);
249 }
250 }
251 BackendRequest::AnyRequest(fut) => {
252 self.pending_requests.push(ProviderRequest::AnyRequest(fut));
253 }
254 }
255 }
256
257 fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
259 match self.storage_requests.entry((address, idx)) {
260 Entry::Occupied(mut entry) => {
261 entry.get_mut().push(listener);
262 }
263 Entry::Vacant(entry) => {
264 trace!(target: "backendhandler", %address, %idx, "preparing storage request");
265 entry.insert(vec![listener]);
266 let provider = self.provider.clone();
267 let block_id = self.block_id.unwrap_or_default();
268 let fut = Box::pin(async move {
269 let storage = provider
270 .get_storage_at(address, idx)
271 .block_id(block_id)
272 .await
273 .map_err(Into::into);
274 (storage, address, idx)
275 });
276 self.pending_requests.push(ProviderRequest::Storage(fut));
277 }
278 }
279 }
280
281 fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
283 trace!(target: "backendhandler", "preparing account request, address={:?}", address);
284 let provider = self.provider.clone();
285 let block_id = self.block_id.unwrap_or_default();
286 let fut = Box::pin(async move {
287 let balance = provider.get_balance(address).block_id(block_id).into_future();
288 let nonce = provider.get_transaction_count(address).block_id(block_id).into_future();
289 let code = provider.get_code_at(address).block_id(block_id).into_future();
290 let resp = tokio::try_join!(balance, nonce, code).map_err(Into::into);
291 (resp, address)
292 });
293 ProviderRequest::Account(fut)
294 }
295
296 fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
298 match self.account_requests.entry(address) {
299 Entry::Occupied(mut entry) => {
300 entry.get_mut().push(listener);
301 }
302 Entry::Vacant(entry) => {
303 entry.insert(vec![listener]);
304 self.pending_requests.push(self.get_account_req(address));
305 }
306 }
307 }
308
309 fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
311 let provider = self.provider.clone();
312 let fut = Box::pin(async move {
313 let block = provider
314 .get_block(number)
315 .full()
316 .await
317 .wrap_err(format!("could not fetch block {number:?}"));
318 (sender, block, number)
319 });
320
321 self.pending_requests.push(ProviderRequest::FullBlock(fut));
322 }
323
324 fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
326 let provider = self.provider.clone();
327 let fut = Box::pin(async move {
328 let block = provider
329 .get_transaction_by_hash(tx)
330 .await
331 .wrap_err_with(|| format!("could not get transaction {tx}"))
332 .and_then(|maybe| {
333 maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
334 });
335 (sender, block, tx)
336 });
337
338 self.pending_requests.push(ProviderRequest::Transaction(fut));
339 }
340
341 fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
343 match self.block_requests.entry(number) {
344 Entry::Occupied(mut entry) => {
345 entry.get_mut().push(listener);
346 }
347 Entry::Vacant(entry) => {
348 trace!(target: "backendhandler", number, "preparing block hash request");
349 entry.insert(vec![listener]);
350 let provider = self.provider.clone();
351 let fut = Box::pin(async move {
352 let block = provider
353 .get_block_by_number(number.into())
354 .hashes()
355 .await
356 .wrap_err("failed to get block");
357
358 let block_hash = match block {
359 Ok(Some(block)) => Ok(block.header.hash),
360 Ok(None) => {
361 warn!(target: "backendhandler", ?number, "block not found");
362 Ok(KECCAK_EMPTY)
365 }
366 Err(err) => {
367 error!(target: "backendhandler", %err, ?number, "failed to get block");
368 Err(err)
369 }
370 };
371 (block_hash, number)
372 });
373 self.pending_requests.push(ProviderRequest::BlockHash(fut));
374 }
375 }
376 }
377}
378
379impl<P> Future for BackendHandler<P>
380where
381 P: Provider<AnyNetwork> + Clone + Unpin + 'static,
382{
383 type Output = ();
384
385 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
386 let pin = self.get_mut();
387 loop {
388 while let Some(req) = pin.queued_requests.pop_front() {
390 pin.on_request(req)
391 }
392
393 loop {
395 match Pin::new(&mut pin.incoming).poll_next(cx) {
396 Poll::Ready(Some(req)) => {
397 pin.queued_requests.push_back(req);
398 }
399 Poll::Ready(None) => {
400 trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
401 return Poll::Ready(());
402 }
403 Poll::Pending => break,
404 }
405 }
406
407 for n in (0..pin.pending_requests.len()).rev() {
409 let mut request = pin.pending_requests.swap_remove(n);
410 match &mut request {
411 ProviderRequest::Account(fut) => {
412 if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
413 let (balance, nonce, code) = match resp {
415 Ok(res) => res,
416 Err(err) => {
417 let err = Arc::new(err);
418 if let Some(listeners) = pin.account_requests.remove(&addr) {
419 listeners.into_iter().for_each(|l| {
420 let _ = l.send(Err(DatabaseError::GetAccount(
421 addr,
422 Arc::clone(&err),
423 )));
424 })
425 }
426 continue;
427 }
428 };
429
430 let (code, code_hash) = if !code.is_empty() {
432 (code.clone(), keccak256(&code))
433 } else {
434 (Bytes::default(), KECCAK_EMPTY)
435 };
436
437 let acc = AccountInfo {
439 nonce,
440 balance,
441 code: Some(Bytecode::new_raw(code)),
442 code_hash,
443 };
444 pin.db.accounts().write().insert(addr, acc.clone());
445
446 if let Some(listeners) = pin.account_requests.remove(&addr) {
448 listeners.into_iter().for_each(|l| {
449 let _ = l.send(Ok(acc.clone()));
450 })
451 }
452 continue;
453 }
454 }
455 ProviderRequest::Storage(fut) => {
456 if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
457 let value = match resp {
458 Ok(value) => value,
459 Err(err) => {
460 let err = Arc::new(err);
462 if let Some(listeners) =
463 pin.storage_requests.remove(&(addr, idx))
464 {
465 listeners.into_iter().for_each(|l| {
466 let _ = l.send(Err(DatabaseError::GetStorage(
467 addr,
468 idx,
469 Arc::clone(&err),
470 )));
471 })
472 }
473 continue;
474 }
475 };
476
477 pin.db.storage().write().entry(addr).or_default().insert(idx, value);
479
480 if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
482 listeners.into_iter().for_each(|l| {
483 let _ = l.send(Ok(value));
484 })
485 }
486 continue;
487 }
488 }
489 ProviderRequest::BlockHash(fut) => {
490 if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
491 let value = match block_hash {
492 Ok(value) => value,
493 Err(err) => {
494 let err = Arc::new(err);
495 if let Some(listeners) = pin.block_requests.remove(&number) {
497 listeners.into_iter().for_each(|l| {
498 let _ = l.send(Err(DatabaseError::GetBlockHash(
499 number,
500 Arc::clone(&err),
501 )));
502 })
503 }
504 continue;
505 }
506 };
507
508 pin.db.block_hashes().write().insert(U256::from(number), value);
510
511 if let Some(listeners) = pin.block_requests.remove(&number) {
513 listeners.into_iter().for_each(|l| {
514 let _ = l.send(Ok(value));
515 })
516 }
517 continue;
518 }
519 }
520 ProviderRequest::FullBlock(fut) => {
521 if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
522 let msg = match resp {
523 Ok(Some(block)) => Ok(block),
524 Ok(None) => Err(DatabaseError::BlockNotFound(number)),
525 Err(err) => {
526 let err = Arc::new(err);
527 Err(DatabaseError::GetFullBlock(number, err))
528 }
529 };
530 let _ = sender.send(msg);
531 continue;
532 }
533 }
534 ProviderRequest::Transaction(fut) => {
535 if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
536 let msg = match tx {
537 Ok(tx) => Ok(tx),
538 Err(err) => {
539 let err = Arc::new(err);
540 Err(DatabaseError::GetTransaction(tx_hash, err))
541 }
542 };
543 let _ = sender.send(msg);
544 continue;
545 }
546 }
547 ProviderRequest::AnyRequest(fut) => {
548 if fut.poll_inner(cx).is_ready() {
549 continue;
550 }
551 }
552 }
553 pin.pending_requests.push(request);
555 }
556
557 if pin.queued_requests.is_empty() {
560 return Poll::Pending;
561 }
562 }
563 }
564}
565
566#[derive(Default, Clone, Debug, PartialEq)]
569pub enum BlockingMode {
570 #[default]
574 BlockInPlace,
575 Block,
579}
580
581impl BlockingMode {
582 pub fn run<F, R>(&self, f: F) -> R
584 where
585 F: FnOnce() -> R,
586 {
587 match self {
588 Self::BlockInPlace => tokio::task::block_in_place(f),
589 Self::Block => f(),
590 }
591 }
592}
593
594#[derive(Clone, Debug)]
623pub struct SharedBackend {
624 backend: UnboundedSender<BackendRequest>,
626 cache: Arc<FlushJsonBlockCacheDB>,
631
632 blocking_mode: BlockingMode,
634}
635
636impl SharedBackend {
637 pub async fn spawn_backend<P>(provider: P, db: BlockchainDb, pin_block: Option<BlockId>) -> Self
645 where
646 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
647 {
648 let (shared, handler) = Self::new(provider, db, pin_block);
649 trace!(target: "backendhandler", "spawning Backendhandler task");
651 tokio::spawn(handler);
652 shared
653 }
654
655 pub fn spawn_backend_thread<P>(
658 provider: P,
659 db: BlockchainDb,
660 pin_block: Option<BlockId>,
661 ) -> Self
662 where
663 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
664 {
665 let (shared, handler) = Self::new(provider, db, pin_block);
666
667 std::thread::Builder::new()
670 .name("fork-backend".into())
671 .spawn(move || {
672 let rt = tokio::runtime::Builder::new_current_thread()
673 .enable_all()
674 .build()
675 .expect("failed to build tokio runtime");
676
677 rt.block_on(handler);
678 })
679 .expect("failed to spawn thread");
680 trace!(target: "backendhandler", "spawned Backendhandler thread");
681
682 shared
683 }
684
685 pub fn new<P>(
687 provider: P,
688 db: BlockchainDb,
689 pin_block: Option<BlockId>,
690 ) -> (Self, BackendHandler<P>)
691 where
692 P: Provider<AnyNetwork> + Unpin + 'static + Clone,
693 {
694 let (backend, backend_rx) = unbounded();
695 let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
696 let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
697 (Self { backend, cache, blocking_mode: Default::default() }, handler)
698 }
699
700 pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
702 Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
703 }
704
705 pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
707 let req = BackendRequest::SetPinnedBlock(block.into());
708 self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
709 }
710
711 pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<AnyRpcBlock> {
713 self.blocking_mode.run(|| {
714 let (sender, rx) = oneshot_channel();
715 let req = BackendRequest::FullBlock(block.into(), sender);
716 self.backend.unbounded_send(req)?;
717 rx.recv()?
718 })
719 }
720
721 pub fn get_transaction(&self, tx: B256) -> DatabaseResult<AnyRpcTransaction> {
723 self.blocking_mode.run(|| {
724 let (sender, rx) = oneshot_channel();
725 let req = BackendRequest::Transaction(tx, sender);
726 self.backend.unbounded_send(req)?;
727 rx.recv()?
728 })
729 }
730
731 fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
732 self.blocking_mode.run(|| {
733 let (sender, rx) = oneshot_channel();
734 let req = BackendRequest::Basic(address, sender);
735 self.backend.unbounded_send(req)?;
736 rx.recv()?.map(Some)
737 })
738 }
739
740 fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
741 self.blocking_mode.run(|| {
742 let (sender, rx) = oneshot_channel();
743 let req = BackendRequest::Storage(address, index, sender);
744 self.backend.unbounded_send(req)?;
745 rx.recv()?
746 })
747 }
748
749 fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
750 self.blocking_mode.run(|| {
751 let (sender, rx) = oneshot_channel();
752 let req = BackendRequest::BlockHash(number, sender);
753 self.backend.unbounded_send(req)?;
754 rx.recv()?
755 })
756 }
757
758 pub fn insert_or_update_address(&self, address_data: AddressData) {
760 let req = BackendRequest::UpdateAddress(address_data);
761 let err = self.backend.unbounded_send(req);
762 match err {
763 Ok(_) => (),
764 Err(e) => {
765 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
766 }
767 }
768 }
769
770 pub fn insert_or_update_storage(&self, storage_data: StorageData) {
772 let req = BackendRequest::UpdateStorage(storage_data);
773 let err = self.backend.unbounded_send(req);
774 match err {
775 Ok(_) => (),
776 Err(e) => {
777 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
778 }
779 }
780 }
781
782 pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
784 let req = BackendRequest::UpdateBlockHash(block_hash_data);
785 let err = self.backend.unbounded_send(req);
786 match err {
787 Ok(_) => (),
788 Err(e) => {
789 error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
790 }
791 }
792 }
793
794 pub fn do_any_request<T, F>(&mut self, fut: F) -> DatabaseResult<T>
796 where
797 F: Future<Output = Result<T, eyre::Report>> + Send + 'static,
798 T: fmt::Debug + Send + 'static,
799 {
800 self.blocking_mode.run(|| {
801 let (sender, rx) = oneshot_channel::<Result<T, eyre::Report>>();
802 let req = BackendRequest::AnyRequest(Box::new(AnyRequestFuture {
803 sender,
804 future: Box::pin(fut),
805 }));
806 self.backend.unbounded_send(req)?;
807 rx.recv()?.map_err(|err| DatabaseError::AnyRequest(Arc::new(err)))
808 })
809 }
810
811 pub fn flush_cache(&self) {
813 self.cache.0.flush();
814 }
815
816 pub fn flush_cache_to(&self, cache_path: &Path) {
818 self.cache.0.flush_to(cache_path);
819 }
820
821 pub fn data(&self) -> Arc<MemDb> {
823 self.cache.0.db().clone()
824 }
825
826 pub fn accounts(&self) -> AddressData {
828 self.cache.0.db().accounts.read().clone()
829 }
830
831 pub fn accounts_len(&self) -> usize {
833 self.cache.0.db().accounts.read().len()
834 }
835
836 pub fn storage(&self) -> StorageData {
838 self.cache.0.db().storage.read().clone()
839 }
840
841 pub fn storage_len(&self) -> usize {
843 self.cache.0.db().storage.read().len()
844 }
845
846 pub fn block_hashes(&self) -> BlockHashData {
848 self.cache.0.db().block_hashes.read().clone()
849 }
850
851 pub fn block_hashes_len(&self) -> usize {
853 self.cache.0.db().block_hashes.read().len()
854 }
855}
856
857impl DatabaseRef for SharedBackend {
858 type Error = DatabaseError;
859
860 fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
861 trace!(target: "sharedbackend", %address, "request basic");
862 self.do_get_basic(address).map_err(|err| {
863 error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
864 if err.is_possibly_non_archive_node_error() {
865 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
866 }
867 err
868 })
869 }
870
871 fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
872 Err(DatabaseError::MissingCode(hash))
873 }
874
875 fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
876 trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
877 self.do_get_storage(address, index).map_err(|err| {
878 error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
879 if err.is_possibly_non_archive_node_error() {
880 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
881 }
882 err
883 })
884 }
885
886 fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
887 trace!(target: "sharedbackend", "request block hash for number {:?}", number);
888 self.do_get_block_hash(number).map_err(|err| {
889 error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
890 if err.is_possibly_non_archive_node_error() {
891 error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
892 }
893 err
894 })
895 }
896}
897
898#[cfg(test)]
899mod tests {
900 use super::*;
901 use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
902 use alloy_provider::ProviderBuilder;
903 use alloy_rpc_client::ClientBuilder;
904 use serde::Deserialize;
905 use std::{collections::BTreeSet, fs, path::PathBuf};
906 use tiny_http::{Response, Server};
907
908 pub fn get_http_provider(endpoint: &str) -> impl Provider<AnyNetwork> + Clone {
909 ProviderBuilder::new()
910 .network::<AnyNetwork>()
911 .connect_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
912 }
913
914 const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
915
916 #[tokio::test(flavor = "multi_thread")]
917 async fn test_builder() {
918 let Some(endpoint) = ENDPOINT else { return };
919 let provider = get_http_provider(endpoint);
920
921 let any_rpc_block = provider.get_block(BlockId::latest()).hashes().await.unwrap().unwrap();
922 let _meta = BlockchainDbMeta::default().with_block(&any_rpc_block.inner);
923 }
924
925 #[tokio::test(flavor = "multi_thread")]
926 async fn shared_backend() {
927 let Some(endpoint) = ENDPOINT else { return };
928
929 let provider = get_http_provider(endpoint);
930 let meta = BlockchainDbMeta {
931 block_env: Default::default(),
932 hosts: BTreeSet::from([endpoint.to_string()]),
933 };
934
935 let db = BlockchainDb::new(meta, None);
936 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
937
938 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
940
941 let idx = U256::from(0u64);
942 let value = backend.storage_ref(address, idx).unwrap();
943 let account = backend.basic_ref(address).unwrap().unwrap();
944
945 let mem_acc = db.accounts().read().get(&address).unwrap().clone();
946 assert_eq!(account.balance, mem_acc.balance);
947 assert_eq!(account.nonce, mem_acc.nonce);
948 let slots = db.storage().read().get(&address).unwrap().clone();
949 assert_eq!(slots.len(), 1);
950 assert_eq!(slots.get(&idx).copied().unwrap(), value);
951
952 let num = 10u64;
953 let hash = backend.block_hash_ref(num).unwrap();
954 let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
955 assert_eq!(hash, mem_hash);
956
957 let max_slots = 5;
958 let handle = std::thread::spawn(move || {
959 for i in 1..max_slots {
960 let idx = U256::from(i);
961 let _ = backend.storage_ref(address, idx);
962 }
963 });
964 handle.join().unwrap();
965 let slots = db.storage().read().get(&address).unwrap().clone();
966 assert_eq!(slots.len() as u64, max_slots);
967 }
968
969 #[test]
970 fn can_read_cache() {
971 let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
972 let json = JsonBlockCacheDB::load(cache_path).unwrap();
973 assert!(!json.db().accounts.read().is_empty());
974 }
975
976 #[tokio::test(flavor = "multi_thread")]
977 async fn can_modify_address() {
978 let Some(endpoint) = ENDPOINT else { return };
979
980 let provider = get_http_provider(endpoint);
981 let meta = BlockchainDbMeta {
982 block_env: Default::default(),
983 hosts: BTreeSet::from([endpoint.to_string()]),
984 };
985
986 let db = BlockchainDb::new(meta, None);
987 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
988
989 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
991
992 let new_acc = AccountInfo {
993 nonce: 1000u64,
994 balance: U256::from(2000),
995 code: None,
996 code_hash: KECCAK_EMPTY,
997 };
998 let mut account_data = AddressData::default();
999 account_data.insert(address, new_acc.clone());
1000
1001 backend.insert_or_update_address(account_data);
1002
1003 let max_slots = 5;
1004 let handle = std::thread::spawn(move || {
1005 for i in 1..max_slots {
1006 let idx = U256::from(i);
1007 let result_address = backend.basic_ref(address).unwrap();
1008 match result_address {
1009 Some(acc) => {
1010 assert_eq!(
1011 acc.nonce, new_acc.nonce,
1012 "The nonce was not changed in instance of index {}",
1013 idx
1014 );
1015 assert_eq!(
1016 acc.balance, new_acc.balance,
1017 "The balance was not changed in instance of index {}",
1018 idx
1019 );
1020
1021 let db_address = {
1023 let accounts = db.accounts().read();
1024 accounts.get(&address).unwrap().clone()
1025 };
1026
1027 assert_eq!(
1028 db_address.nonce, new_acc.nonce,
1029 "The nonce was not changed in instance of index {}",
1030 idx
1031 );
1032 assert_eq!(
1033 db_address.balance, new_acc.balance,
1034 "The balance was not changed in instance of index {}",
1035 idx
1036 );
1037 }
1038 None => panic!("Account not found"),
1039 }
1040 }
1041 });
1042 handle.join().unwrap();
1043 }
1044
1045 #[tokio::test(flavor = "multi_thread")]
1046 async fn can_modify_storage() {
1047 let Some(endpoint) = ENDPOINT else { return };
1048
1049 let provider = get_http_provider(endpoint);
1050 let meta = BlockchainDbMeta {
1051 block_env: Default::default(),
1052 hosts: BTreeSet::from([endpoint.to_string()]),
1053 };
1054
1055 let db = BlockchainDb::new(meta, None);
1056 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1057
1058 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1060
1061 let mut storage_data = StorageData::default();
1062 let mut storage_info = StorageInfo::default();
1063 storage_info.insert(U256::from(20), U256::from(10));
1064 storage_info.insert(U256::from(30), U256::from(15));
1065 storage_info.insert(U256::from(40), U256::from(20));
1066
1067 storage_data.insert(address, storage_info);
1068
1069 backend.insert_or_update_storage(storage_data.clone());
1070
1071 let max_slots = 5;
1072 let handle = std::thread::spawn(move || {
1073 for _ in 1..max_slots {
1074 for (address, info) in &storage_data {
1075 for (index, value) in info {
1076 let result_storage = backend.do_get_storage(*address, *index);
1077 match result_storage {
1078 Ok(stg_db) => {
1079 assert_eq!(
1080 stg_db, *value,
1081 "Storage in slot number {} in address {} do not have the same value", index, address
1082 );
1083
1084 let db_result = {
1085 let storage = db.storage().read();
1086 let address_storage = storage.get(address).unwrap();
1087 *address_storage.get(index).unwrap()
1088 };
1089
1090 assert_eq!(
1091 stg_db, db_result,
1092 "Storage in slot number {} in address {} do not have the same value", index, address
1093 )
1094 }
1095
1096 Err(err) => {
1097 panic!("There was a database error: {}", err)
1098 }
1099 }
1100 }
1101 }
1102 }
1103 });
1104 handle.join().unwrap();
1105 }
1106
1107 #[tokio::test(flavor = "multi_thread")]
1108 async fn can_modify_block_hashes() {
1109 let Some(endpoint) = ENDPOINT else { return };
1110
1111 let provider = get_http_provider(endpoint);
1112 let meta = BlockchainDbMeta {
1113 block_env: Default::default(),
1114 hosts: BTreeSet::from([endpoint.to_string()]),
1115 };
1116
1117 let db = BlockchainDb::new(meta, None);
1118 let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1119
1120 let mut block_hash_data = BlockHashData::default();
1124 block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
1125 block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
1126 block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
1127 block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
1128 block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
1129
1130 backend.insert_or_update_block_hashes(block_hash_data.clone());
1131
1132 let max_slots: u64 = 5;
1133 let handle = std::thread::spawn(move || {
1134 for i in 1..max_slots {
1135 let key = U256::from(i);
1136 let result_hash = backend.do_get_block_hash(i);
1137 match result_hash {
1138 Ok(hash) => {
1139 assert_eq!(
1140 hash,
1141 *block_hash_data.get(&key).unwrap(),
1142 "The hash in block {} did not match",
1143 key
1144 );
1145
1146 let db_result = {
1147 let hashes = db.block_hashes().read();
1148 *hashes.get(&key).unwrap()
1149 };
1150
1151 assert_eq!(hash, db_result, "The hash in block {} did not match", key);
1152 }
1153 Err(err) => panic!("Hash not found, error: {}", err),
1154 }
1155 }
1156 });
1157 handle.join().unwrap();
1158 }
1159
1160 #[tokio::test(flavor = "multi_thread")]
1161 async fn can_modify_storage_with_cache() {
1162 let Some(endpoint) = ENDPOINT else { return };
1163
1164 let provider = get_http_provider(endpoint);
1165 let meta = BlockchainDbMeta {
1166 block_env: Default::default(),
1167 hosts: BTreeSet::from([endpoint.to_string()]),
1168 };
1169
1170 fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
1172
1173 let cache_path =
1174 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1175
1176 let db = BlockchainDb::new(meta.clone(), Some(cache_path));
1177 let backend =
1178 SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
1179
1180 let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
1182
1183 let mut storage_data = StorageData::default();
1184 let mut storage_info = StorageInfo::default();
1185 storage_info.insert(U256::from(1), U256::from(10));
1186 storage_info.insert(U256::from(2), U256::from(15));
1187 storage_info.insert(U256::from(3), U256::from(20));
1188 storage_info.insert(U256::from(4), U256::from(20));
1189 storage_info.insert(U256::from(5), U256::from(15));
1190 storage_info.insert(U256::from(6), U256::from(10));
1191
1192 let mut address_data = backend.basic_ref(address).unwrap().unwrap();
1193 address_data.code = None;
1194
1195 storage_data.insert(address, storage_info);
1196
1197 backend.insert_or_update_storage(storage_data.clone());
1198
1199 let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
1200 new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
1202
1203 let mut account_data = AddressData::default();
1204 account_data.insert(address, new_acc.clone());
1205
1206 backend.insert_or_update_address(account_data);
1207
1208 let backend_clone = backend.clone();
1209
1210 let max_slots = 5;
1211 let handle = std::thread::spawn(move || {
1212 for _ in 1..max_slots {
1213 for (address, info) in &storage_data {
1214 for (index, value) in info {
1215 let result_storage = backend.do_get_storage(*address, *index);
1216 match result_storage {
1217 Ok(stg_db) => {
1218 assert_eq!(
1219 stg_db, *value,
1220 "Storage in slot number {} in address {} doesn't have the same value", index, address
1221 );
1222
1223 let db_result = {
1224 let storage = db.storage().read();
1225 let address_storage = storage.get(address).unwrap();
1226 *address_storage.get(index).unwrap()
1227 };
1228
1229 assert_eq!(
1230 stg_db, db_result,
1231 "Storage in slot number {} in address {} doesn't have the same value", index, address
1232 );
1233 }
1234
1235 Err(err) => {
1236 panic!("There was a database error: {}", err)
1237 }
1238 }
1239 }
1240 }
1241 }
1242
1243 backend_clone.flush_cache();
1244 });
1245 handle.join().unwrap();
1246
1247 let cache_path =
1250 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
1251
1252 let json_db = BlockchainDb::new(meta, Some(cache_path));
1253
1254 let mut storage_data = StorageData::default();
1255 let mut storage_info = StorageInfo::default();
1256 storage_info.insert(U256::from(1), U256::from(10));
1257 storage_info.insert(U256::from(2), U256::from(15));
1258 storage_info.insert(U256::from(3), U256::from(20));
1259 storage_info.insert(U256::from(4), U256::from(20));
1260 storage_info.insert(U256::from(5), U256::from(15));
1261 storage_info.insert(U256::from(6), U256::from(10));
1262
1263 storage_data.insert(address, storage_info);
1264
1265 let max_slots = 5;
1267 let handle = std::thread::spawn(move || {
1268 for _ in 1..max_slots {
1269 for (address, info) in &storage_data {
1270 for (index, value) in info {
1271 let result_storage = {
1272 let storage = json_db.storage().read();
1273 let address_storage = storage.get(address).unwrap().clone();
1274 *address_storage.get(index).unwrap()
1275 };
1276
1277 assert_eq!(
1278 result_storage, *value,
1279 "Storage in slot number {} in address {} doesn't have the same value",
1280 index, address
1281 );
1282 }
1283 }
1284 }
1285 });
1286
1287 handle.join().unwrap();
1288
1289 fs::remove_file("test-data/storage-tmp.json").unwrap();
1291 }
1292
1293 #[tokio::test(flavor = "multi_thread")]
1294 async fn shared_backend_any_request() {
1295 let expected_response_bytes: Bytes = vec![0xff, 0xee].into();
1296 let server = Server::http("0.0.0.0:0").expect("failed starting in-memory http server");
1297 let endpoint = format!("http://{}", server.server_addr());
1298
1299 let expected_bytes_innner = expected_response_bytes.clone();
1301 let server_handle = std::thread::spawn(move || {
1302 #[derive(Debug, Deserialize)]
1303 struct Request {
1304 method: String,
1305 }
1306 let mut request = server.recv().unwrap();
1307 let rpc_request: Request =
1308 serde_json::from_reader(request.as_reader()).expect("failed parsing request");
1309
1310 match rpc_request.method.as_str() {
1311 "foo_callCustomMethod" => request
1312 .respond(Response::from_string(format!(
1313 r#"{{"result": "{}"}}"#,
1314 alloy_primitives::hex::encode_prefixed(expected_bytes_innner),
1315 )))
1316 .unwrap(),
1317 _ => request
1318 .respond(Response::from_string(r#"{"error": "invalid request"}"#))
1319 .unwrap(),
1320 };
1321 });
1322
1323 let provider = get_http_provider(&endpoint);
1324 let meta = BlockchainDbMeta {
1325 block_env: Default::default(),
1326 hosts: BTreeSet::from([endpoint.to_string()]),
1327 };
1328
1329 let db = BlockchainDb::new(meta, None);
1330 let provider_inner = provider.clone();
1331 let mut backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
1332
1333 let actual_response_bytes = backend
1334 .do_any_request(async move {
1335 let bytes: alloy_primitives::Bytes =
1336 provider_inner.raw_request("foo_callCustomMethod".into(), vec!["0001"]).await?;
1337 Ok(bytes)
1338 })
1339 .expect("failed performing any request");
1340
1341 assert_eq!(actual_response_bytes, expected_response_bytes);
1342
1343 server_handle.join().unwrap();
1344 }
1345}