use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use dashmap::mapref::one::{Ref, RefMut};
use dashmap::DashMap;
use crate::blockifier::transaction_executor::{
TransactionExecutionOutput,
TransactionExecutorError,
TransactionExecutorResult,
};
use crate::bouncer::Bouncer;
use crate::concurrency::fee_utils::complete_fee_transfer_flow;
use crate::concurrency::scheduler::{Scheduler, Task, TransactionStatus};
use crate::concurrency::versioned_state::{
ThreadSafeVersionedState,
VersionedState,
VersionedStateError,
};
use crate::concurrency::TxIndex;
use crate::context::BlockContext;
use crate::metrics::{CALLS_RUNNING_NATIVE, TOTAL_CALLS};
use crate::state::cached_state::{ContractClassMapping, StateMaps, TransactionalState};
use crate::state::state_api::{StateReader, UpdatableState};
use crate::transaction::objects::{TransactionExecutionInfo, TransactionExecutionResult};
use crate::transaction::transaction_execution::Transaction;
use crate::transaction::transactions::ExecutableTransaction;
#[cfg(test)]
#[path = "worker_logic_test.rs"]
pub mod test;
const EXECUTION_OUTPUTS_UNWRAP_ERROR: &str = "Execution task outputs should not be None.";
#[derive(Debug)]
pub struct ExecutionTaskOutput {
pub reads: StateMaps,
pub state_diff: StateMaps,
pub contract_classes: ContractClassMapping,
pub run_time: Duration,
pub result: TransactionExecutionResult<TransactionExecutionInfo>,
}
#[derive(Default)]
pub struct ConcurrencyMetrics {
abort_counter: AtomicUsize,
abort_in_commit_counter: AtomicUsize,
execute_counter: AtomicUsize,
validate_counter: AtomicUsize,
}
impl ConcurrencyMetrics {
pub fn count_abort(&self) {
self.abort_counter.fetch_add(1, Ordering::Relaxed);
}
pub fn count_abort_in_commit(&self) {
self.abort_in_commit_counter.fetch_add(1, Ordering::Relaxed);
}
pub fn count_execute(&self) {
self.execute_counter.fetch_add(1, Ordering::Relaxed);
}
pub fn count_validate(&self) {
self.validate_counter.fetch_add(1, Ordering::Relaxed);
}
pub fn get_metrics(&self) -> (usize, usize, usize, usize) {
(
self.abort_counter.load(Ordering::Relaxed),
self.abort_in_commit_counter.load(Ordering::Relaxed),
self.execute_counter.load(Ordering::Relaxed),
self.validate_counter.load(Ordering::Relaxed),
)
}
}
#[derive(Debug, PartialEq)]
enum CommitResult {
Success,
NoRoomInBlock,
ValidationFailed,
}
pub struct WorkerExecutor<S: StateReader> {
pub scheduler: Scheduler,
pub state: ThreadSafeVersionedState<S>,
pub txs: DashMap<TxIndex, Arc<Transaction>>,
pub n_txs: Mutex<usize>,
pub execution_outputs: DashMap<TxIndex, ExecutionTaskOutput>,
pub block_context: Arc<BlockContext>,
pub bouncer: Arc<Mutex<Bouncer>>,
pub execution_deadline: Option<Instant>,
pub metrics: ConcurrencyMetrics,
}
impl<S: StateReader> WorkerExecutor<S> {
pub fn new(
state: ThreadSafeVersionedState<S>,
txs: Vec<Transaction>,
block_context: Arc<BlockContext>,
bouncer: Arc<Mutex<Bouncer>>,
execution_deadline: Option<Instant>,
) -> Self {
let n_txs = txs.len();
WorkerExecutor {
scheduler: Scheduler::new(n_txs),
state,
txs: txs.into_iter().enumerate().map(|(i, tx)| (i, Arc::new(tx))).collect(),
n_txs: Mutex::new(n_txs),
execution_outputs: DashMap::new(),
block_context,
bouncer,
execution_deadline,
metrics: ConcurrencyMetrics::default(),
}
}
pub fn initialize(
state: S,
txs: Vec<Transaction>,
block_context: Arc<BlockContext>,
bouncer: Arc<Mutex<Bouncer>>,
execution_deadline: Option<Instant>,
) -> Self {
let versioned_state = VersionedState::new(state);
let chunk_state = ThreadSafeVersionedState::new(versioned_state);
WorkerExecutor::new(chunk_state, txs, block_context, bouncer, execution_deadline)
}
pub fn run(&self) {
loop {
if let Some(deadline) = self.execution_deadline {
if Instant::now() > deadline {
log::debug!("Execution timed out.");
self.scheduler.halt();
break;
}
}
self.commit_while_possible();
match self.scheduler.next_task() {
Task::ExecutionTask(tx_index) => {
self.execute(tx_index);
}
Task::ValidationTask(tx_index) => {
if self.validate(tx_index, false).is_err() {
assert!(self.scheduler.done());
break;
}
}
Task::NoTaskAvailable => {
thread::sleep(Duration::from_micros(1));
}
Task::AskForTask => continue,
Task::Done => break,
};
}
}
pub fn add_txs(&self, txs: &[Transaction]) -> (TxIndex, TxIndex) {
let mut n_txs_lock = self.n_txs.lock().expect("Failed to lock n_txs");
let from_tx = *n_txs_lock;
let n_new_txs = txs.len();
for (i, tx) in txs.iter().enumerate() {
self.txs.insert(from_tx + i, Arc::new(tx.clone()));
self.scheduler.new_tx(from_tx + i);
}
let to_tx = from_tx + n_new_txs;
*n_txs_lock = to_tx;
(from_tx, to_tx)
}
pub fn extract_execution_outputs(
&self,
from_tx: usize,
) -> Vec<TransactionExecutorResult<TransactionExecutionOutput>> {
let n_committed_txs = self.scheduler.get_n_committed_txs();
(from_tx..n_committed_txs)
.map(|tx_index| {
let execution_output = self.extract_execution_output(tx_index);
execution_output
.result
.map(|tx_execution_info| (tx_execution_info, execution_output.state_diff))
.map_err(TransactionExecutorError::from)
})
.collect()
}
fn tx_at(&self, tx_index: TxIndex) -> Arc<Transaction> {
self.txs.get(&tx_index).expect("Transaction missing").value().clone()
}
fn get_n_txs(&self) -> usize {
*self.n_txs.lock().expect("Failed to lock n_txs")
}
fn commit_while_possible(&self) {
if let Some(mut tx_committer) = self.scheduler.try_enter_commit_phase() {
while let Some(tx_index) = tx_committer.try_commit() {
let commit_result = self.commit_tx(tx_index).unwrap_or_else(|_| {
panic!("Commit transaction should not be called after clearing the state.");
});
match commit_result {
CommitResult::Success => {}
CommitResult::NoRoomInBlock => {
tx_committer.uncommit();
self.scheduler.halt();
}
CommitResult::ValidationFailed => {
tx_committer.uncommit();
return;
}
}
}
}
}
fn execute(&self, tx_index: TxIndex) {
self.metrics.count_execute();
self.execute_tx(tx_index);
self.scheduler.finish_execution(tx_index)
}
fn execute_tx(&self, tx_index: TxIndex) {
let mut tx_versioned_state = self.state.pin_version(tx_index);
let mut transactional_state =
TransactionalState::create_transactional(&mut tx_versioned_state);
let concurrency_mode = true;
let tx = self.tx_at(tx_index);
let execution_start = Instant::now();
let execution_result =
tx.execute_raw(&mut transactional_state, &self.block_context, concurrency_mode);
let run_time = execution_start.elapsed();
let execution_output_inner = match execution_result {
Ok(_) => {
let tx_reads_writes = transactional_state.cache.take();
let state_diff = tx_reads_writes.to_state_diff().state_maps;
let contract_classes = transactional_state.class_hash_to_class.take();
tx_versioned_state.apply_writes(&state_diff, &contract_classes);
ExecutionTaskOutput {
reads: tx_reads_writes.initial_reads,
state_diff,
contract_classes,
run_time,
result: execution_result,
}
}
Err(_) => ExecutionTaskOutput {
reads: transactional_state.cache.take().initial_reads,
state_diff: StateMaps::default(),
contract_classes: HashMap::default(),
run_time,
result: execution_result,
},
};
self.execution_outputs.insert(tx_index, execution_output_inner);
}
fn validate(&self, tx_index: TxIndex, commit_phase: bool) -> Result<bool, VersionedStateError> {
self.metrics.count_validate();
let tx_versioned_state = self.state.pin_version(tx_index);
let Some(execution_output) = self.lock_execution_output_opt(tx_index) else {
assert!(!commit_phase, "Missing execution output in commit phase.");
let status = self.scheduler.get_tx_status(tx_index);
assert_eq!(
status,
TransactionStatus::Committed,
"Missing execution output with tx_status={status:?}",
);
return Ok(true);
};
let reads = &execution_output.reads;
let reads_valid = tx_versioned_state.validate_reads(reads)?;
let aborted = !reads_valid && self.scheduler.try_validation_abort(tx_index, commit_phase);
if aborted {
self.metrics.count_abort();
tx_versioned_state
.delete_writes(&execution_output.state_diff, &execution_output.contract_classes)?;
self.scheduler.finish_abort(tx_index);
}
Ok(reads_valid)
}
fn commit_tx(&self, tx_index: TxIndex) -> Result<CommitResult, VersionedStateError> {
if !self.validate(tx_index, true)? {
self.metrics.count_abort_in_commit();
return Ok(CommitResult::ValidationFailed);
}
let mut tx_versioned_state = self.state.pin_version(tx_index);
let mut execution_output_refmut = self.lock_execution_output(tx_index);
let execution_output = execution_output_refmut.value_mut();
let mut tx_state_changes_keys = execution_output.state_diff.keys();
let tx = self.tx_at(tx_index);
let execution_status: &str;
if let Ok(tx_execution_info) = execution_output.result.as_mut() {
let tx_context = self.block_context.to_tx_context(tx.as_ref());
let concurrency_mode = true;
tx_state_changes_keys.update_sequencer_key_in_storage(
&tx_context,
tx_execution_info,
concurrency_mode,
);
let execution_summary =
tx_execution_info.summarize(&self.block_context.versioned_constants);
let call_summary = execution_summary.call_summary;
TOTAL_CALLS.increment(call_summary.n_calls);
CALLS_RUNNING_NATIVE.increment(call_summary.n_calls_running_native);
let bouncer_result = self.bouncer.lock().expect("Bouncer lock failed.").try_update(
&tx_versioned_state,
&tx_state_changes_keys,
&execution_summary,
&tx_execution_info.summarize_builtins(),
&tx_execution_info.receipt.resources,
&self.block_context.versioned_constants,
tx_execution_info.receipt.gas.l2_gas,
);
if let Err(error) = bouncer_result {
match error {
TransactionExecutorError::BlockFull => return Ok(CommitResult::NoRoomInBlock),
_ => {
panic!("Bouncer update failed. {error:?}: {error}");
}
}
}
complete_fee_transfer_flow(
&tx_context,
tx_execution_info,
&mut execution_output.state_diff,
&mut tx_versioned_state,
tx.as_ref(),
);
execution_status =
if tx_execution_info.is_reverted() { "reverted" } else { "successfully executed" };
} else {
execution_status = "rejected";
}
let tx_hash = Transaction::tx_hash(tx.as_ref());
let run_time = execution_output.run_time.as_millis();
let n_reads = execution_output.reads.storage.len();
let n_writes = execution_output.state_diff.storage.len();
log::debug!(
"Transaction with tx_hash: {tx_hash} {execution_status}. Execution time: \
{run_time}ms. number of storage reads: {n_reads}, number of storage writes: \
{n_writes}."
);
Ok(CommitResult::Success)
}
pub fn lock_execution_output(
&self,
tx_index: TxIndex,
) -> RefMut<'_, TxIndex, ExecutionTaskOutput> {
self.execution_outputs.get_mut(&tx_index).expect(EXECUTION_OUTPUTS_UNWRAP_ERROR)
}
pub fn lock_execution_output_opt(
&self,
tx_index: TxIndex,
) -> Option<Ref<'_, TxIndex, ExecutionTaskOutput>> {
self.execution_outputs.get(&tx_index)
}
pub fn extract_execution_output(&self, tx_index: TxIndex) -> ExecutionTaskOutput {
self.execution_outputs.remove(&tx_index).expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).1
}
}
impl<U: UpdatableState> WorkerExecutor<U> {
pub fn commit_chunk_and_recover_block_state(&self, n_committed_txs: usize) -> U {
let (abort_counter, abort_in_commit_counter, execute_counter, validate_counter) =
self.metrics.get_metrics();
let n_txs = self.get_n_txs();
log::debug!(
"Concurrent execution done. Number of transactions: {n_txs}; Committed chunk size: \
{n_committed_txs}; Execute counter: {execute_counter}; Validate counter: \
{validate_counter}; Abort counter: {abort_counter}; Abort in commit counter: \
{abort_in_commit_counter}"
);
self.state.into_inner_state().commit_chunk_and_recover_block_state(n_committed_txs)
}
}