use crate::{
cache::{BlockchainDb, FlushJsonBlockCacheDB, MemDb, StorageInfo},
error::{DatabaseError, DatabaseResult},
};
use alloy_primitives::{keccak256, Address, Bytes, B256, U256};
use alloy_provider::{network::AnyNetwork, Provider};
use alloy_rpc_types::{Block, BlockId, Transaction};
use alloy_serde::WithOtherFields;
use alloy_transport::Transport;
use eyre::WrapErr;
use futures::{
channel::mpsc::{channel, Receiver, Sender},
stream::Stream,
task::{Context, Poll},
Future, FutureExt,
};
use revm::{
db::DatabaseRef,
primitives::{AccountInfo, Bytecode, HashMap as Map, KECCAK_EMPTY},
};
use rustc_hash::FxHashMap;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
future::IntoFuture,
marker::PhantomData,
path::Path,
pin::Pin,
sync::{
mpsc::{channel as oneshot_channel, Sender as OneshotSender},
Arc,
},
};
pub const NON_ARCHIVE_NODE_WARNING: &str = "\
It looks like you're trying to fork from an older block with a non-archive node which is not \
supported. Please try to change your RPC url to an archive node if the issue persists.";
type AccountFuture<Err> =
Pin<Box<dyn Future<Output = (Result<(U256, u64, Bytes), Err>, Address)> + Send>>;
type StorageFuture<Err> = Pin<Box<dyn Future<Output = (Result<U256, Err>, Address, U256)> + Send>>;
type BlockHashFuture<Err> = Pin<Box<dyn Future<Output = (Result<B256, Err>, u64)> + Send>>;
type FullBlockFuture<Err> =
Pin<Box<dyn Future<Output = (FullBlockSender, Result<Option<Block>, Err>, BlockId)> + Send>>;
type TransactionFuture<Err> = Pin<
Box<
dyn Future<Output = (TransactionSender, Result<WithOtherFields<Transaction>, Err>, B256)>
+ Send,
>,
>;
type AccountInfoSender = OneshotSender<DatabaseResult<AccountInfo>>;
type StorageSender = OneshotSender<DatabaseResult<U256>>;
type BlockHashSender = OneshotSender<DatabaseResult<B256>>;
type FullBlockSender = OneshotSender<DatabaseResult<Block>>;
type TransactionSender = OneshotSender<DatabaseResult<WithOtherFields<Transaction>>>;
type AddressData = Map<Address, AccountInfo>;
type StorageData = Map<Address, StorageInfo>;
type BlockHashData = Map<U256, B256>;
enum ProviderRequest<Err> {
Account(AccountFuture<Err>),
Storage(StorageFuture<Err>),
BlockHash(BlockHashFuture<Err>),
FullBlock(FullBlockFuture<Err>),
Transaction(TransactionFuture<Err>),
}
#[derive(Debug)]
enum BackendRequest {
Basic(Address, AccountInfoSender),
Storage(Address, U256, StorageSender),
BlockHash(u64, BlockHashSender),
FullBlock(BlockId, FullBlockSender),
Transaction(B256, TransactionSender),
SetPinnedBlock(BlockId),
UpdateAddress(AddressData),
UpdateStorage(StorageData),
UpdateBlockHash(BlockHashData),
}
#[must_use = "futures do nothing unless polled"]
pub struct BackendHandler<T, P> {
provider: P,
transport: PhantomData<T>,
db: BlockchainDb,
pending_requests: Vec<ProviderRequest<eyre::Report>>,
account_requests: HashMap<Address, Vec<AccountInfoSender>>,
storage_requests: HashMap<(Address, U256), Vec<StorageSender>>,
block_requests: FxHashMap<u64, Vec<BlockHashSender>>,
incoming: Receiver<BackendRequest>,
queued_requests: VecDeque<BackendRequest>,
block_id: Option<BlockId>,
}
impl<T, P> BackendHandler<T, P>
where
T: Transport + Clone,
P: Provider<T, AnyNetwork> + Clone + Unpin + 'static,
{
fn new(
provider: P,
db: BlockchainDb,
rx: Receiver<BackendRequest>,
block_id: Option<BlockId>,
) -> Self {
Self {
provider,
db,
pending_requests: Default::default(),
account_requests: Default::default(),
storage_requests: Default::default(),
block_requests: Default::default(),
queued_requests: Default::default(),
incoming: rx,
block_id,
transport: PhantomData,
}
}
fn on_request(&mut self, req: BackendRequest) {
match req {
BackendRequest::Basic(addr, sender) => {
trace!(target: "backendhandler", "received request basic address={:?}", addr);
let acc = self.db.accounts().read().get(&addr).cloned();
if let Some(basic) = acc {
let _ = sender.send(Ok(basic));
} else {
self.request_account(addr, sender);
}
}
BackendRequest::BlockHash(number, sender) => {
let hash = self.db.block_hashes().read().get(&U256::from(number)).cloned();
if let Some(hash) = hash {
let _ = sender.send(Ok(hash));
} else {
self.request_hash(number, sender);
}
}
BackendRequest::FullBlock(number, sender) => {
self.request_full_block(number, sender);
}
BackendRequest::Transaction(tx, sender) => {
self.request_transaction(tx, sender);
}
BackendRequest::Storage(addr, idx, sender) => {
let value =
self.db.storage().read().get(&addr).and_then(|acc| acc.get(&idx).copied());
if let Some(value) = value {
let _ = sender.send(Ok(value));
} else {
self.request_account_storage(addr, idx, sender);
}
}
BackendRequest::SetPinnedBlock(block_id) => {
self.block_id = Some(block_id);
}
BackendRequest::UpdateAddress(address_data) => {
for (address, data) in address_data {
self.db.accounts().write().insert(address, data);
}
}
BackendRequest::UpdateStorage(storage_data) => {
for (address, data) in storage_data {
self.db.storage().write().insert(address, data);
}
}
BackendRequest::UpdateBlockHash(block_hash_data) => {
for (block, hash) in block_hash_data {
self.db.block_hashes().write().insert(block, hash);
}
}
}
}
fn request_account_storage(&mut self, address: Address, idx: U256, listener: StorageSender) {
match self.storage_requests.entry((address, idx)) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(listener);
}
Entry::Vacant(entry) => {
trace!(target: "backendhandler", %address, %idx, "preparing storage request");
entry.insert(vec![listener]);
let provider = self.provider.clone();
let block_id = self.block_id.unwrap_or_default();
let fut = Box::pin(async move {
let storage = provider
.get_storage_at(address, idx)
.block_id(block_id)
.await
.map_err(Into::into);
(storage, address, idx)
});
self.pending_requests.push(ProviderRequest::Storage(fut));
}
}
}
fn get_account_req(&self, address: Address) -> ProviderRequest<eyre::Report> {
trace!(target: "backendhandler", "preparing account request, address={:?}", address);
let provider = self.provider.clone();
let block_id = self.block_id.unwrap_or_default();
let fut = Box::pin(async move {
let balance = provider.get_balance(address).block_id(block_id).into_future();
let nonce = provider.get_transaction_count(address).block_id(block_id).into_future();
let code = provider.get_code_at(address).block_id(block_id).into_future();
let resp = tokio::try_join!(balance, nonce, code).map_err(Into::into);
(resp, address)
});
ProviderRequest::Account(fut)
}
fn request_account(&mut self, address: Address, listener: AccountInfoSender) {
match self.account_requests.entry(address) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(listener);
}
Entry::Vacant(entry) => {
entry.insert(vec![listener]);
self.pending_requests.push(self.get_account_req(address));
}
}
}
fn request_full_block(&mut self, number: BlockId, sender: FullBlockSender) {
let provider = self.provider.clone();
let fut = Box::pin(async move {
let block = provider
.get_block(number, true.into())
.await
.wrap_err("could not fetch block {number:?}");
(sender, block, number)
});
self.pending_requests.push(ProviderRequest::FullBlock(fut));
}
fn request_transaction(&mut self, tx: B256, sender: TransactionSender) {
let provider = self.provider.clone();
let fut = Box::pin(async move {
let block = provider
.get_transaction_by_hash(tx)
.await
.wrap_err_with(|| format!("could not get transaction {tx}"))
.and_then(|maybe| {
maybe.ok_or_else(|| eyre::eyre!("could not get transaction {tx}"))
});
(sender, block, tx)
});
self.pending_requests.push(ProviderRequest::Transaction(fut));
}
fn request_hash(&mut self, number: u64, listener: BlockHashSender) {
match self.block_requests.entry(number) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(listener);
}
Entry::Vacant(entry) => {
trace!(target: "backendhandler", number, "preparing block hash request");
entry.insert(vec![listener]);
let provider = self.provider.clone();
let fut = Box::pin(async move {
let block = provider
.get_block_by_number(number.into(), false)
.await
.wrap_err("failed to get block");
let block_hash = match block {
Ok(Some(block)) => Ok(block
.header
.hash
.expect("empty block hash on mined block, this should never happen")),
Ok(None) => {
warn!(target: "backendhandler", ?number, "block not found");
Ok(KECCAK_EMPTY)
}
Err(err) => {
error!(target: "backendhandler", %err, ?number, "failed to get block");
Err(err)
}
};
(block_hash, number)
});
self.pending_requests.push(ProviderRequest::BlockHash(fut));
}
}
}
}
impl<T, P> Future for BackendHandler<T, P>
where
T: Transport + Clone + Unpin,
P: Provider<T, AnyNetwork> + Clone + Unpin + 'static,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();
loop {
while let Some(req) = pin.queued_requests.pop_front() {
pin.on_request(req)
}
loop {
match Pin::new(&mut pin.incoming).poll_next(cx) {
Poll::Ready(Some(req)) => {
pin.queued_requests.push_back(req);
}
Poll::Ready(None) => {
trace!(target: "backendhandler", "last sender dropped, ready to drop (&flush cache)");
return Poll::Ready(());
}
Poll::Pending => break,
}
}
for n in (0..pin.pending_requests.len()).rev() {
let mut request = pin.pending_requests.swap_remove(n);
match &mut request {
ProviderRequest::Account(fut) => {
if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
let (balance, nonce, code) = match resp {
Ok(res) => res,
Err(err) => {
let err = Arc::new(err);
if let Some(listeners) = pin.account_requests.remove(&addr) {
listeners.into_iter().for_each(|l| {
let _ = l.send(Err(DatabaseError::GetAccount(
addr,
Arc::clone(&err),
)));
})
}
continue;
}
};
let (code, code_hash) = if !code.is_empty() {
(code.clone(), keccak256(&code))
} else {
(Bytes::default(), KECCAK_EMPTY)
};
let acc = AccountInfo {
nonce,
balance,
code: Some(Bytecode::new_raw(code)),
code_hash,
};
pin.db.accounts().write().insert(addr, acc.clone());
if let Some(listeners) = pin.account_requests.remove(&addr) {
listeners.into_iter().for_each(|l| {
let _ = l.send(Ok(acc.clone()));
})
}
continue;
}
}
ProviderRequest::Storage(fut) => {
if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
let value = match resp {
Ok(value) => value,
Err(err) => {
let err = Arc::new(err);
if let Some(listeners) =
pin.storage_requests.remove(&(addr, idx))
{
listeners.into_iter().for_each(|l| {
let _ = l.send(Err(DatabaseError::GetStorage(
addr,
idx,
Arc::clone(&err),
)));
})
}
continue;
}
};
pin.db.storage().write().entry(addr).or_default().insert(idx, value);
if let Some(listeners) = pin.storage_requests.remove(&(addr, idx)) {
listeners.into_iter().for_each(|l| {
let _ = l.send(Ok(value));
})
}
continue;
}
}
ProviderRequest::BlockHash(fut) => {
if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
let value = match block_hash {
Ok(value) => value,
Err(err) => {
let err = Arc::new(err);
if let Some(listeners) = pin.block_requests.remove(&number) {
listeners.into_iter().for_each(|l| {
let _ = l.send(Err(DatabaseError::GetBlockHash(
number,
Arc::clone(&err),
)));
})
}
continue;
}
};
pin.db.block_hashes().write().insert(U256::from(number), value);
if let Some(listeners) = pin.block_requests.remove(&number) {
listeners.into_iter().for_each(|l| {
let _ = l.send(Ok(value));
})
}
continue;
}
}
ProviderRequest::FullBlock(fut) => {
if let Poll::Ready((sender, resp, number)) = fut.poll_unpin(cx) {
let msg = match resp {
Ok(Some(block)) => Ok(block),
Ok(None) => Err(DatabaseError::BlockNotFound(number)),
Err(err) => {
let err = Arc::new(err);
Err(DatabaseError::GetFullBlock(number, err))
}
};
let _ = sender.send(msg);
continue;
}
}
ProviderRequest::Transaction(fut) => {
if let Poll::Ready((sender, tx, tx_hash)) = fut.poll_unpin(cx) {
let msg = match tx {
Ok(tx) => Ok(tx),
Err(err) => {
let err = Arc::new(err);
Err(DatabaseError::GetTransaction(tx_hash, err))
}
};
let _ = sender.send(msg);
continue;
}
}
}
pin.pending_requests.push(request);
}
if pin.queued_requests.is_empty() {
return Poll::Pending;
}
}
}
}
#[derive(Default, Clone, Debug, PartialEq)]
pub enum BlockingMode {
#[default]
BlockInPlace,
Block,
}
impl BlockingMode {
pub fn run<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
match self {
Self::BlockInPlace => tokio::task::block_in_place(f),
Self::Block => f(),
}
}
}
#[derive(Clone, Debug)]
pub struct SharedBackend {
backend: Sender<BackendRequest>,
cache: Arc<FlushJsonBlockCacheDB>,
blocking_mode: BlockingMode,
}
impl SharedBackend {
pub async fn spawn_backend<T, P>(
provider: P,
db: BlockchainDb,
pin_block: Option<BlockId>,
) -> Self
where
T: Transport + Clone + Unpin,
P: Provider<T, AnyNetwork> + Unpin + 'static + Clone,
{
let (shared, handler) = Self::new(provider, db, pin_block);
trace!(target: "backendhandler", "spawning Backendhandler task");
tokio::spawn(handler);
shared
}
pub fn spawn_backend_thread<T, P>(
provider: P,
db: BlockchainDb,
pin_block: Option<BlockId>,
) -> Self
where
T: Transport + Clone + Unpin,
P: Provider<T, AnyNetwork> + Unpin + 'static + Clone,
{
let (shared, handler) = Self::new(provider, db, pin_block);
std::thread::Builder::new()
.name("fork-backend".into())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime");
rt.block_on(handler);
})
.expect("failed to spawn thread");
trace!(target: "backendhandler", "spawned Backendhandler thread");
shared
}
pub fn new<T, P>(
provider: P,
db: BlockchainDb,
pin_block: Option<BlockId>,
) -> (Self, BackendHandler<T, P>)
where
T: Transport + Clone + Unpin,
P: Provider<T, AnyNetwork> + Unpin + 'static + Clone,
{
let (backend, backend_rx) = channel(1);
let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
(Self { backend, cache, blocking_mode: Default::default() }, handler)
}
pub fn with_blocking_mode(&self, mode: BlockingMode) -> Self {
Self { backend: self.backend.clone(), cache: self.cache.clone(), blocking_mode: mode }
}
pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
let req = BackendRequest::SetPinnedBlock(block.into());
self.backend.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))
}
pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<Block> {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::FullBlock(block.into(), sender);
self.backend.clone().try_send(req)?;
rx.recv()?
})
}
pub fn get_transaction(&self, tx: B256) -> DatabaseResult<WithOtherFields<Transaction>> {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::Transaction(tx, sender);
self.backend.clone().try_send(req)?;
rx.recv()?
})
}
fn do_get_basic(&self, address: Address) -> DatabaseResult<Option<AccountInfo>> {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::Basic(address, sender);
self.backend.clone().try_send(req)?;
rx.recv()?.map(Some)
})
}
fn do_get_storage(&self, address: Address, index: U256) -> DatabaseResult<U256> {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::Storage(address, index, sender);
self.backend.clone().try_send(req)?;
rx.recv()?
})
}
fn do_get_block_hash(&self, number: u64) -> DatabaseResult<B256> {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::BlockHash(number, sender);
self.backend.clone().try_send(req)?;
rx.recv()?
})
}
pub fn insert_or_update_address(&self, address_data: AddressData) {
let req = BackendRequest::UpdateAddress(address_data);
let err = self.backend.clone().try_send(req);
match err {
Ok(_) => (),
Err(e) => {
error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
}
}
}
pub fn insert_or_update_storage(&self, storage_data: StorageData) {
let req = BackendRequest::UpdateStorage(storage_data);
let err = self.backend.clone().try_send(req);
match err {
Ok(_) => (),
Err(e) => {
error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
}
}
}
pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
let req = BackendRequest::UpdateBlockHash(block_hash_data);
let err = self.backend.clone().try_send(req);
match err {
Ok(_) => (),
Err(e) => {
error!(target: "sharedbackend", "Failed to send update address request: {:?}", e)
}
}
}
pub fn flush_cache(&self) {
self.cache.0.flush();
}
pub fn flush_cache_to(&self, cache_path: &Path) {
self.cache.0.flush_to(cache_path);
}
pub fn data(&self) -> Arc<MemDb> {
self.cache.0.db().clone()
}
pub fn accounts(&self) -> AddressData {
self.cache.0.db().accounts.read().clone()
}
pub fn accounts_len(&self) -> usize {
self.cache.0.db().accounts.read().len()
}
pub fn storage(&self) -> StorageData {
self.cache.0.db().storage.read().clone()
}
pub fn storage_len(&self) -> usize {
self.cache.0.db().storage.read().len()
}
pub fn block_hashes(&self) -> BlockHashData {
self.cache.0.db().block_hashes.read().clone()
}
pub fn block_hashes_len(&self) -> usize {
self.cache.0.db().block_hashes.read().len()
}
}
impl DatabaseRef for SharedBackend {
type Error = DatabaseError;
fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
trace!(target: "sharedbackend", %address, "request basic");
self.do_get_basic(address).map_err(|err| {
error!(target: "sharedbackend", %err, %address, "Failed to send/recv `basic`");
if err.is_possibly_non_archive_node_error() {
error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
}
err
})
}
fn code_by_hash_ref(&self, hash: B256) -> Result<Bytecode, Self::Error> {
Err(DatabaseError::MissingCode(hash))
}
fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
trace!(target: "sharedbackend", "request storage {:?} at {:?}", address, index);
self.do_get_storage(address, index).map_err(|err| {
error!(target: "sharedbackend", %err, %address, %index, "Failed to send/recv `storage`");
if err.is_possibly_non_archive_node_error() {
error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
}
err
})
}
fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
trace!(target: "sharedbackend", "request block hash for number {:?}", number);
self.do_get_block_hash(number).map_err(|err| {
error!(target: "sharedbackend", %err, %number, "Failed to send/recv `block_hash`");
if err.is_possibly_non_archive_node_error() {
error!(target: "sharedbackend", "{NON_ARCHIVE_NODE_WARNING}");
}
err
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::{BlockchainDbMeta, JsonBlockCacheDB};
use alloy_provider::{ProviderBuilder, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_transport_http::{Client, Http};
use std::{collections::BTreeSet, fs, path::PathBuf};
pub fn get_http_provider(endpoint: &str) -> RootProvider<Http<Client>, AnyNetwork> {
ProviderBuilder::new()
.network::<AnyNetwork>()
.on_client(ClientBuilder::default().http(endpoint.parse().unwrap()))
}
const ENDPOINT: Option<&str> = option_env!("ETH_RPC_URL");
#[tokio::test(flavor = "multi_thread")]
async fn shared_backend() {
let Some(endpoint) = ENDPOINT else { return };
let provider = get_http_provider(endpoint);
let meta = BlockchainDbMeta {
cfg_env: Default::default(),
block_env: Default::default(),
hosts: BTreeSet::from([endpoint.to_string()]),
};
let db = BlockchainDb::new(meta, None);
let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
let idx = U256::from(0u64);
let value = backend.storage_ref(address, idx).unwrap();
let account = backend.basic_ref(address).unwrap().unwrap();
let mem_acc = db.accounts().read().get(&address).unwrap().clone();
assert_eq!(account.balance, mem_acc.balance);
assert_eq!(account.nonce, mem_acc.nonce);
let slots = db.storage().read().get(&address).unwrap().clone();
assert_eq!(slots.len(), 1);
assert_eq!(slots.get(&idx).copied().unwrap(), value);
let num = 10u64;
let hash = backend.block_hash_ref(num).unwrap();
let mem_hash = *db.block_hashes().read().get(&U256::from(num)).unwrap();
assert_eq!(hash, mem_hash);
let max_slots = 5;
let handle = std::thread::spawn(move || {
for i in 1..max_slots {
let idx = U256::from(i);
let _ = backend.storage_ref(address, idx);
}
});
handle.join().unwrap();
let slots = db.storage().read().get(&address).unwrap().clone();
assert_eq!(slots.len() as u64, max_slots);
}
#[test]
fn can_read_cache() {
let cache_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage.json");
let json = JsonBlockCacheDB::load(cache_path).unwrap();
assert!(!json.db().accounts.read().is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn can_modify_address() {
let Some(endpoint) = ENDPOINT else { return };
let provider = get_http_provider(endpoint);
let meta = BlockchainDbMeta {
cfg_env: Default::default(),
block_env: Default::default(),
hosts: BTreeSet::from([endpoint.to_string()]),
};
let db = BlockchainDb::new(meta, None);
let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
let new_acc = AccountInfo {
nonce: 1000u64,
balance: U256::from(2000),
code: None,
code_hash: KECCAK_EMPTY,
};
let mut account_data: AddressData = Map::new();
account_data.insert(address, new_acc.clone());
backend.insert_or_update_address(account_data);
let max_slots = 5;
let handle = std::thread::spawn(move || {
for i in 1..max_slots {
let idx = U256::from(i);
let result_address = backend.basic_ref(address).unwrap();
match result_address {
Some(acc) => {
assert_eq!(
acc.nonce, new_acc.nonce,
"The nonce was not changed in instance of index {}",
idx
);
assert_eq!(
acc.balance, new_acc.balance,
"The balance was not changed in instance of index {}",
idx
);
let db_address = {
let accounts = db.accounts().read();
accounts.get(&address).unwrap().clone()
};
assert_eq!(
db_address.nonce, new_acc.nonce,
"The nonce was not changed in instance of index {}",
idx
);
assert_eq!(
db_address.balance, new_acc.balance,
"The balance was not changed in instance of index {}",
idx
);
}
None => panic!("Account not found"),
}
}
});
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn can_modify_storage() {
let Some(endpoint) = ENDPOINT else { return };
let provider = get_http_provider(endpoint);
let meta = BlockchainDbMeta {
cfg_env: Default::default(),
block_env: Default::default(),
hosts: BTreeSet::from([endpoint.to_string()]),
};
let db = BlockchainDb::new(meta, None);
let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
let mut storage_data: StorageData = Map::new();
let mut storage_info: StorageInfo = Map::new();
storage_info.insert(U256::from(20), U256::from(10));
storage_info.insert(U256::from(30), U256::from(15));
storage_info.insert(U256::from(40), U256::from(20));
storage_data.insert(address, storage_info);
backend.insert_or_update_storage(storage_data.clone());
let max_slots = 5;
let handle = std::thread::spawn(move || {
for _ in 1..max_slots {
for (address, info) in &storage_data {
for (index, value) in info {
let result_storage = backend.do_get_storage(*address, *index);
match result_storage {
Ok(stg_db) => {
assert_eq!(
stg_db, *value,
"Storage in slot number {} in address {} do not have the same value", index, address
);
let db_result = {
let storage = db.storage().read();
let address_storage = storage.get(address).unwrap();
*address_storage.get(index).unwrap()
};
assert_eq!(
stg_db, db_result,
"Storage in slot number {} in address {} do not have the same value", index, address
)
}
Err(err) => {
panic!("There was a database error: {}", err)
}
}
}
}
}
});
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn can_modify_block_hashes() {
let Some(endpoint) = ENDPOINT else { return };
let provider = get_http_provider(endpoint);
let meta = BlockchainDbMeta {
cfg_env: Default::default(),
block_env: Default::default(),
hosts: BTreeSet::from([endpoint.to_string()]),
};
let db = BlockchainDb::new(meta, None);
let backend = SharedBackend::spawn_backend(Arc::new(provider), db.clone(), None).await;
let mut block_hash_data: BlockHashData = Map::new();
block_hash_data.insert(U256::from(1), B256::from(U256::from(1)));
block_hash_data.insert(U256::from(2), B256::from(U256::from(2)));
block_hash_data.insert(U256::from(3), B256::from(U256::from(3)));
block_hash_data.insert(U256::from(4), B256::from(U256::from(4)));
block_hash_data.insert(U256::from(5), B256::from(U256::from(5)));
backend.insert_or_update_block_hashes(block_hash_data.clone());
let max_slots: u64 = 5;
let handle = std::thread::spawn(move || {
for i in 1..max_slots {
let key = U256::from(i);
let result_hash = backend.do_get_block_hash(i);
match result_hash {
Ok(hash) => {
assert_eq!(
hash,
*block_hash_data.get(&key).unwrap(),
"The hash in block {} did not match",
key
);
let db_result = {
let hashes = db.block_hashes().read();
*hashes.get(&key).unwrap()
};
assert_eq!(hash, db_result, "The hash in block {} did not match", key);
}
Err(err) => panic!("Hash not found, error: {}", err),
}
}
});
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn can_modify_storage_with_cache() {
let Some(endpoint) = ENDPOINT else { return };
let provider = get_http_provider(endpoint);
let meta = BlockchainDbMeta {
cfg_env: Default::default(),
block_env: Default::default(),
hosts: BTreeSet::from([endpoint.to_string()]),
};
fs::copy("test-data/storage.json", "test-data/storage-tmp.json").unwrap();
let cache_path =
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
let db = BlockchainDb::new(meta.clone(), Some(cache_path));
let backend =
SharedBackend::spawn_backend(Arc::new(provider.clone()), db.clone(), None).await;
let address: Address = "63091244180ae240c87d1f528f5f269134cb07b3".parse().unwrap();
let mut storage_data: StorageData = Map::new();
let mut storage_info: StorageInfo = Map::new();
storage_info.insert(U256::from(1), U256::from(10));
storage_info.insert(U256::from(2), U256::from(15));
storage_info.insert(U256::from(3), U256::from(20));
storage_info.insert(U256::from(4), U256::from(20));
storage_info.insert(U256::from(5), U256::from(15));
storage_info.insert(U256::from(6), U256::from(10));
let mut address_data = backend.basic_ref(address).unwrap().unwrap();
address_data.code = None;
storage_data.insert(address, storage_info);
backend.insert_or_update_storage(storage_data.clone());
let mut new_acc = backend.basic_ref(address).unwrap().unwrap();
new_acc.code = Some(Bytecode::new_raw(([10, 20, 30, 40]).into()));
let mut account_data: AddressData = Map::new();
account_data.insert(address, new_acc.clone());
backend.insert_or_update_address(account_data);
let backend_clone = backend.clone();
let max_slots = 5;
let handle = std::thread::spawn(move || {
for _ in 1..max_slots {
for (address, info) in &storage_data {
for (index, value) in info {
let result_storage = backend.do_get_storage(*address, *index);
match result_storage {
Ok(stg_db) => {
assert_eq!(
stg_db, *value,
"Storage in slot number {} in address {} doesn't have the same value", index, address
);
let db_result = {
let storage = db.storage().read();
let address_storage = storage.get(address).unwrap();
*address_storage.get(index).unwrap()
};
assert_eq!(
stg_db, db_result,
"Storage in slot number {} in address {} doesn't have the same value", index, address
);
}
Err(err) => {
panic!("There was a database error: {}", err)
}
}
}
}
}
backend_clone.flush_cache();
});
handle.join().unwrap();
let cache_path =
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/storage-tmp.json");
let json_db = BlockchainDb::new(meta, Some(cache_path));
let mut storage_data: StorageData = Map::new();
let mut storage_info: StorageInfo = Map::new();
storage_info.insert(U256::from(1), U256::from(10));
storage_info.insert(U256::from(2), U256::from(15));
storage_info.insert(U256::from(3), U256::from(20));
storage_info.insert(U256::from(4), U256::from(20));
storage_info.insert(U256::from(5), U256::from(15));
storage_info.insert(U256::from(6), U256::from(10));
storage_data.insert(address, storage_info);
let max_slots = 5;
let handle = std::thread::spawn(move || {
for _ in 1..max_slots {
for (address, info) in &storage_data {
for (index, value) in info {
let result_storage = {
let storage = json_db.storage().read();
let address_storage = storage.get(address).unwrap().clone();
*address_storage.get(index).unwrap()
};
assert_eq!(
result_storage, *value,
"Storage in slot number {} in address {} doesn't have the same value",
index, address
);
}
}
}
});
handle.join().unwrap();
fs::remove_file("test-data/storage-tmp.json").unwrap();
}
}