use std::thread::{self, JoinHandle};
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use ethers::{abi::AbiDecode, types::ValueOrArray};
use revm::{
db::AccountState,
inspector_handle_register,
primitives::{Env, HashMap, B256},
};
use tokio::sync::broadcast::channel;
use super::*;
#[cfg_attr(doc, doc(hidden))]
#[cfg_attr(doc, allow(unused_imports))]
#[cfg(doc)]
use crate::middleware::ArbiterMiddleware;
use crate::{
console::abi::HardhatConsoleCalls, database::inspector::ArbiterInspector,
middleware::connection::revm_logs_to_ethers_logs,
};
pub mod instruction;
use instruction::*;
pub(crate) type InstructionSender = Sender<Instruction>;
pub(crate) type InstructionReceiver = Receiver<Instruction>;
pub(crate) type OutcomeSender = Sender<Result<Outcome, ArbiterCoreError>>;
pub(crate) type OutcomeReceiver = Receiver<Result<Outcome, ArbiterCoreError>>;
#[derive(Debug)]
pub struct Environment {
pub parameters: EnvironmentParameters,
pub(crate) db: ArbiterDB,
inspector: Option<ArbiterInspector>,
pub(crate) socket: Socket,
pub(crate) handle: Option<JoinHandle<Result<(), ArbiterCoreError>>>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct EnvironmentParameters {
pub label: Option<String>,
pub gas_limit: Option<U256>,
pub contract_size_limit: Option<usize>,
pub console_logs: bool,
pub pay_gas: bool,
}
pub struct EnvironmentBuilder {
parameters: EnvironmentParameters,
db: ArbiterDB,
}
impl EnvironmentBuilder {
pub fn build(self) -> Environment {
Environment::create(self.parameters, self.db).run()
}
pub fn with_label(mut self, label: impl Into<String>) -> Self {
self.parameters.label = Some(label.into());
self
}
pub fn with_gas_limit(mut self, gas_limit: U256) -> Self {
self.parameters.gas_limit = Some(gas_limit);
self
}
pub fn with_contract_size_limit(mut self, contract_size_limit: usize) -> Self {
self.parameters.contract_size_limit = Some(contract_size_limit);
self
}
pub fn with_state(mut self, state: impl Into<CacheDB<EmptyDB>>) -> Self {
self.db.state = Arc::new(RwLock::new(state.into()));
self
}
pub fn with_logs(
mut self,
logs: impl Into<std::collections::HashMap<U256, Vec<eLog>>>,
) -> Self {
self.db.logs = Arc::new(RwLock::new(logs.into()));
self
}
pub fn with_arbiter_db(mut self, db: ArbiterDB) -> Self {
self.db = db;
self
}
pub fn with_console_logs(mut self) -> Self {
self.parameters.console_logs = true;
self
}
pub fn with_pay_gas(mut self) -> Self {
self.parameters.pay_gas = true;
self
}
}
impl Environment {
pub fn builder() -> EnvironmentBuilder {
EnvironmentBuilder {
parameters: EnvironmentParameters::default(),
db: ArbiterDB::default(),
}
}
fn create(parameters: EnvironmentParameters, db: ArbiterDB) -> Self {
let (instruction_sender, instruction_receiver) = unbounded();
let (event_broadcaster, _) = channel(512);
let socket = Socket {
instruction_sender: Arc::new(instruction_sender),
instruction_receiver,
event_broadcaster,
};
let inspector = if parameters.console_logs || parameters.pay_gas {
Some(ArbiterInspector::new(
parameters.console_logs,
parameters.pay_gas,
))
} else {
Some(ArbiterInspector::new(false, false))
};
Self {
socket,
inspector,
parameters,
db,
handle: None,
}
}
fn run(mut self) -> Self {
let label = self.parameters.label.clone();
let db = self.db.clone();
let mut env = Env::default();
env.cfg.limit_contract_code_size = self.parameters.contract_size_limit;
env.block.gas_limit = self.parameters.gas_limit.unwrap_or(U256::MAX);
let inspector = self.inspector.take().unwrap();
let instruction_receiver = self.socket.instruction_receiver.clone();
let event_broadcaster = self.socket.event_broadcaster.clone();
let handle = thread::spawn(move || {
let mut evm = Evm::builder()
.with_db(db.clone())
.with_env(Box::new(env))
.with_external_context(inspector)
.append_handler_register(inspector_handle_register)
.build();
let mut transaction_index = U64::from(0_u64);
let mut cumulative_gas_per_block = eU256::from(0);
while let Ok(instruction) = instruction_receiver.recv() {
trace!(
"Instruction {:?} received by environment labeled: {:?}",
instruction,
label
);
match instruction {
Instruction::AddAccount {
address,
outcome_sender,
} => {
let recast_address = Address::from(address.as_fixed_bytes());
let account = revm::db::DbAccount {
info: AccountInfo::default(),
account_state: AccountState::None,
storage: HashMap::new(),
};
match db.state.write()?.accounts.insert(recast_address, account) {
None => outcome_sender.send(Ok(Outcome::AddAccountCompleted))?,
Some(_) => {
outcome_sender.send(Err(ArbiterCoreError::AccountCreationError))?;
}
}
}
Instruction::BlockUpdate {
block_number,
block_timestamp,
outcome_sender,
} => {
let old_block_number = evm.block().number;
let receipt_data = ReceiptData {
block_number: convert_uint_to_u64(old_block_number)?,
transaction_index,
cumulative_gas_per_block,
};
evm.block_mut().number = U256::from_limbs(block_number.0);
evm.block_mut().timestamp = U256::from_limbs(block_timestamp.0);
transaction_index = U64::from(0);
cumulative_gas_per_block = eU256::from(0);
outcome_sender.send(Ok(Outcome::BlockUpdateCompleted(receipt_data)))?;
}
Instruction::Cheatcode {
cheatcode,
outcome_sender,
} => match cheatcode {
Cheatcodes::Load {
account,
key,
block: _,
} => {
let recast_address = Address::from(account.as_fixed_bytes());
let recast_key = B256::from(key.as_fixed_bytes()).into();
match db.state.write()?.accounts.get_mut(&recast_address) {
Some(account) => {
let value: U256 = match account.storage.get::<U256>(&recast_key)
{
Some(value) => *value,
None => U256::ZERO,
};
outcome_sender.send(Ok(Outcome::CheatcodeReturn(
CheatcodesReturn::Load { value },
)))?;
}
None => {
outcome_sender
.send(Err(ArbiterCoreError::AccountDoesNotExistError))?;
}
};
}
Cheatcodes::Store {
account,
key,
value,
} => {
let recast_address = Address::from(account.as_fixed_bytes());
let recast_key = B256::from(key.as_fixed_bytes());
let recast_value = B256::from(value.as_fixed_bytes());
match db.state.write()?.accounts.get_mut(&recast_address) {
Some(account) => {
account
.storage
.insert(recast_key.into(), recast_value.into());
outcome_sender.send(Ok(Outcome::CheatcodeReturn(
CheatcodesReturn::Store,
)))?;
}
None => {
outcome_sender
.send(Err(ArbiterCoreError::AccountDoesNotExistError))?;
}
};
}
Cheatcodes::Deal { address, amount } => {
let recast_address = Address::from(address.as_fixed_bytes());
match db.state.write()?.accounts.get_mut(&recast_address) {
Some(account) => {
account.info.balance += U256::from_limbs(amount.0);
outcome_sender.send(Ok(Outcome::CheatcodeReturn(
CheatcodesReturn::Deal,
)))?;
}
None => {
outcome_sender
.send(Err(ArbiterCoreError::AccountDoesNotExistError))?;
}
};
}
Cheatcodes::Access { address } => {
let recast_address = Address::from(address.as_fixed_bytes());
match db.state.write()?.accounts.get(&recast_address) {
Some(account) => {
let account_state = match account.account_state {
AccountState::None => AccountStateSerializable::None,
AccountState::Touched => AccountStateSerializable::Touched,
AccountState::StorageCleared => {
AccountStateSerializable::StorageCleared
}
AccountState::NotExisting => {
AccountStateSerializable::NotExisting
}
};
let account = CheatcodesReturn::Access {
account_state,
info: account.info.clone(),
storage: account.storage.clone(),
};
outcome_sender.send(Ok(Outcome::CheatcodeReturn(account)))?;
}
None => {
outcome_sender
.send(Err(ArbiterCoreError::AccountDoesNotExistError))?;
}
}
}
},
Instruction::Call {
tx_env,
outcome_sender,
} => {
*evm.tx_mut() = tx_env;
let result = evm.transact()?.result;
if let Some(console_log) = &mut evm.context.external.console_log {
console_log.0.drain(..).for_each(|log| {
trace!(
"Console logs: {:?}",
HardhatConsoleCalls::decode(log).unwrap().to_string()
)
});
};
outcome_sender.send(Ok(Outcome::CallCompleted(result)))?;
}
Instruction::SetGasPrice {
gas_price,
outcome_sender,
} => {
evm.tx_mut().gas_price = U256::from_limbs(gas_price.0);
outcome_sender.send(Ok(Outcome::SetGasPriceCompleted))?;
}
Instruction::Transaction {
tx_env,
outcome_sender,
} => {
*evm.tx_mut() = tx_env;
let execution_result = match evm.transact_commit() {
Ok(result) => {
if let Some(console_log) = &mut evm.context.external.console_log {
console_log.0.drain(..).for_each(|log| {
trace!(
"Console logs: {:?}",
HardhatConsoleCalls::decode(log).unwrap().to_string()
)
});
};
result
}
Err(e) => {
outcome_sender.send(Err(ArbiterCoreError::EVMError(e)))?;
continue;
}
};
cumulative_gas_per_block += eU256::from(execution_result.gas_used());
let block_number = convert_uint_to_u64(evm.block().number)?;
let receipt_data = ReceiptData {
block_number,
transaction_index,
cumulative_gas_per_block,
};
let mut logs = db.logs.write()?;
match logs.get_mut(&evm.block().number) {
Some(log_vec) => {
log_vec.extend(revm_logs_to_ethers_logs(
execution_result.logs().to_vec(),
&receipt_data,
));
}
None => {
logs.insert(
evm.block().number,
revm_logs_to_ethers_logs(
execution_result.logs().to_vec(),
&receipt_data,
),
);
}
}
match event_broadcaster.send(Broadcast::Event(
execution_result.logs().to_vec(),
receipt_data.clone(),
)) {
Ok(_) => {}
Err(_) => {
warn!(
"Event was not sent to any listeners. Are there any listeners?"
)
}
}
outcome_sender.send(Ok(Outcome::TransactionCompleted(
execution_result,
receipt_data,
)))?;
transaction_index += U64::from(1);
}
Instruction::Query {
environment_data,
outcome_sender,
} => {
let outcome = match environment_data {
EnvironmentData::BlockNumber => {
Ok(Outcome::QueryReturn(evm.block().number.to_string()))
}
EnvironmentData::BlockTimestamp => {
Ok(Outcome::QueryReturn(evm.block().timestamp.to_string()))
}
EnvironmentData::GasPrice => {
Ok(Outcome::QueryReturn(evm.tx().gas_price.to_string()))
}
EnvironmentData::Balance(address) => {
match db
.state
.read()
.unwrap()
.accounts
.get::<Address>(&address.as_fixed_bytes().into())
{
Some(account) => {
Ok(Outcome::QueryReturn(account.info.balance.to_string()))
}
None => Err(ArbiterCoreError::AccountDoesNotExistError),
}
}
EnvironmentData::TransactionCount(address) => {
match db
.state
.read()
.unwrap()
.accounts
.get::<Address>(&address.as_fixed_bytes().into())
{
Some(account) => {
Ok(Outcome::QueryReturn(account.info.nonce.to_string()))
}
None => Err(ArbiterCoreError::AccountDoesNotExistError),
}
}
EnvironmentData::Logs { filter } => {
let logs = db.logs.read().unwrap();
let from_block = U256::from(
filter
.block_option
.get_from_block()
.ok_or(ArbiterCoreError::MissingDataError)?
.as_number()
.ok_or(ArbiterCoreError::MissingDataError)?
.0[0],
);
let to_block = U256::from(
filter
.block_option
.get_to_block()
.ok_or(ArbiterCoreError::MissingDataError)?
.as_number()
.ok_or(ArbiterCoreError::MissingDataError)?
.0[0],
);
let mut return_logs = Vec::new();
logs.keys().for_each(|blocknum| {
if blocknum >= &from_block && blocknum <= &to_block {
return_logs.extend(logs.get(blocknum).cloned().unwrap());
}
});
return_logs.retain(|log| {
filter.topics.iter().any(|topic_option| match topic_option {
Some(topic_val_or_array) => match topic_val_or_array {
ValueOrArray::Value(topic) => match topic {
Some(topic) => log.topics.contains(topic),
None => true,
},
ValueOrArray::Array(topics) => {
topics.iter().any(|topic| match topic {
Some(topic) => log.topics.contains(topic),
None => true,
})
}
},
None => true,
})
});
return_logs.retain(|log| {
filter.address.iter().any(|address_value_or_array| {
match address_value_or_array {
ValueOrArray::Value(address) => &log.address == address,
ValueOrArray::Array(addresses) => {
addresses.iter().any(|addr| &log.address == addr)
}
}
})
});
Ok(Outcome::QueryReturn(
serde_json::to_string(&return_logs).unwrap(),
))
}
};
outcome_sender.send(outcome)?;
}
Instruction::Stop(outcome_sender) => {
match event_broadcaster.send(Broadcast::StopSignal) {
Ok(_) => {}
Err(_) => {
warn!("Stop signal was not sent to any listeners. Are there any listeners?")
}
}
outcome_sender.send(Ok(Outcome::StopCompleted(db)))?;
break;
}
}
}
Ok(())
});
self.handle = Some(handle);
self
}
pub fn stop(mut self) -> Result<ArbiterDB, ArbiterCoreError> {
let (outcome_sender, outcome_receiver) = bounded(1);
self.socket
.instruction_sender
.send(Instruction::Stop(outcome_sender))?;
let outcome = outcome_receiver.recv()??;
let db = match outcome {
Outcome::StopCompleted(stopped_db) => stopped_db,
_ => unreachable!(),
};
if let Some(label) = &self.parameters.label {
warn!("Stopped environment with label: {}", label);
} else {
warn!("Stopped environment with no label.");
}
drop(self.socket.instruction_sender);
self.handle
.take()
.unwrap()
.join()
.map_err(|_| ArbiterCoreError::JoinError)??;
Ok(db)
}
}
#[derive(Debug, Clone)]
pub(crate) struct Socket {
pub(crate) instruction_sender: Arc<InstructionSender>,
pub(crate) instruction_receiver: InstructionReceiver,
pub(crate) event_broadcaster: BroadcastSender<Broadcast>,
}
#[derive(Clone, Debug)]
pub enum Broadcast {
StopSignal,
Event(Vec<Log>, ReceiptData),
}
#[inline]
fn convert_uint_to_u64(input: U256) -> Result<U64, ArbiterCoreError> {
let as_str = input.to_string();
match as_str.parse::<u64>() {
Ok(val) => Ok(val.into()),
Err(e) => Err(e)?,
}
}
#[cfg(test)]
mod tests {
use super::*;
pub(crate) const TEST_ENV_LABEL: &str = "test";
const TEST_CONTRACT_SIZE_LIMIT: usize = 42069;
const TEST_GAS_LIMIT: u64 = 1_333_333_333_337;
#[test]
fn new_with_parameters() {
let environment = Environment::builder()
.with_label(TEST_ENV_LABEL)
.with_contract_size_limit(TEST_CONTRACT_SIZE_LIMIT)
.with_gas_limit(U256::from(TEST_GAS_LIMIT));
assert_eq!(environment.parameters.label, Some(TEST_ENV_LABEL.into()));
assert_eq!(
environment.parameters.contract_size_limit.unwrap(),
TEST_CONTRACT_SIZE_LIMIT
);
assert_eq!(
environment.parameters.gas_limit.unwrap(),
U256::from(TEST_GAS_LIMIT)
);
}
#[test]
fn conversion() {
let input = U256::from(10000);
assert_eq!(convert_uint_to_u64(input).unwrap(), U64::from(10000));
let input = U256::from(u64::MAX);
assert_eq!(convert_uint_to_u64(input).unwrap(), U64::from(u64::MAX));
let input = U256::from(u64::MAX) + U256::from(1);
assert!(convert_uint_to_u64(input).is_err());
}
}