use std::cmp;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use humantime::format_duration;
use rayon::prelude::*;
use tycho_executor::{Executor, ExecutorInspector, ExecutorParams, ParsedConfig, TxError};
use tycho_types::cell::HashBytes;
use tycho_types::models::*;
use tycho_util::metrics::HistogramGuard;
use tycho_util::{FastHashMap, FastHashSet};
use super::messages_buffer::MessageGroup;
use super::types::{
AccountId, ExecutedTransaction, ParsedMessage, ShardAccountStuff, SkippedTransaction,
};
use crate::collator::work_units::ExecuteWu;
use crate::tracing_targets;
pub(super) struct MessagesExecutor {
shard_id: ShardIdent,
min_next_lt: u64,
config: Arc<ParsedConfig>,
params: Arc<ExecutorParams>,
accounts_cache: AccountsCache,
wu_params_execute: WorkUnitsParamsExecute,
}
impl MessagesExecutor {
pub fn new(
shard_id: ShardIdent,
min_next_lt: u64,
config: Arc<ParsedConfig>,
params: Arc<ExecutorParams>,
shard_accounts: ShardAccounts,
wu_params_execute: WorkUnitsParamsExecute,
) -> Self {
Self {
shard_id,
min_next_lt,
config,
params,
accounts_cache: AccountsCache {
workchain_id: shard_id.workchain().try_into().unwrap(),
shard_accounts,
items: Default::default(),
},
wu_params_execute,
}
}
pub fn shard_id(&self) -> ShardIdent {
self.shard_id
}
pub fn min_next_lt(&self) -> u64 {
self.min_next_lt
}
pub fn executor_params(&self) -> &Arc<ExecutorParams> {
&self.params
}
pub fn into_accounts_cache_raw(
self,
) -> (
impl ExactSizeIterator<Item = Box<ShardAccountStuff>>,
ShardAccounts,
) {
let AccountsCache {
shard_accounts,
items,
..
} = self.accounts_cache;
(items.into_values(), shard_accounts)
}
pub fn take_account_stuff_if<F>(
&mut self,
account_id: &AccountId,
f: F,
) -> Result<Option<Box<ShardAccountStuff>>>
where
F: FnOnce(&ShardAccountStuff) -> bool,
{
self.accounts_cache.take_account_stuff_if(account_id, f)
}
pub fn execute_group(
&mut self,
msg_group: MessageGroup,
execute_wu: &mut ExecuteWu,
) -> Result<ExecutedGroup> {
tracing::trace!(target: tracing_targets::EXEC_MANAGER, "execute messages group");
let labels = &[("workchain", self.shard_id.workchain().to_string())];
let mut ext_msgs_skipped = 0;
let group_horizontal_size = msg_group.len();
let group_accounts_count = msg_group.accounts_count();
let group_messages_count = msg_group.messages_count();
let group_mean_vert_size: usize = group_messages_count
.checked_div(group_horizontal_size)
.unwrap_or_default();
let mut group_max_vert_size = 0;
let mut items = Vec::with_capacity(group_messages_count);
let mut ext_msgs_error_count = 0;
let mut max_account_msgs_exec_time = Duration::ZERO;
let mut total_exec_time = Duration::ZERO;
let mut group_gas = 0u128;
let min_next_lt = self.min_next_lt;
let config = self.config.clone();
let params = self.params.clone();
let mut touched_account_ids = FastHashSet::default();
let accounts_cache = Arc::new(&self.accounts_cache);
let result = msg_group
.into_par_iter()
.map(|(account_id, msgs)| {
Self::execute_subgroup(
account_id,
msgs,
&accounts_cache,
min_next_lt,
&config,
¶ms,
)
})
.collect_vec_list();
for result in result {
for executed in result {
self.save_subgroup_result(
&mut ext_msgs_skipped,
&mut max_account_msgs_exec_time,
&mut total_exec_time,
&mut ext_msgs_error_count,
&mut group_max_vert_size,
&mut group_gas,
&mut items,
&mut touched_account_ids,
executed?,
)?;
}
}
let group_exec_wu = execute_wu.append_group_exec_wu(
&self.wu_params_execute,
group_accounts_count as u64,
group_gas,
);
let mean_account_msgs_exec_time = total_exec_time
.checked_div(group_horizontal_size as u32)
.unwrap_or_default();
tracing::trace!(target: tracing_targets::EXEC_MANAGER,
group_horizontal_size, group_max_vert_size, group_accounts_count,
total_exec_time = %format_duration(total_exec_time),
mean_account_msgs_exec_time = %format_duration(mean_account_msgs_exec_time),
max_account_msgs_exec_time = %format_duration(max_account_msgs_exec_time),
group_messages_count, group_gas, group_exec_wu,
"execute_group",
);
metrics::gauge!("tycho_do_collate_one_tick_group_messages_count", labels)
.set(group_messages_count as f64);
metrics::gauge!("tycho_do_collate_one_tick_group_horizontal_size", labels)
.set(group_horizontal_size as f64);
metrics::gauge!("tycho_do_collate_one_tick_group_mean_vert_size", labels)
.set(group_mean_vert_size as f64);
metrics::gauge!("tycho_do_collate_one_tick_group_max_vert_size", labels)
.set(group_max_vert_size as f64);
metrics::histogram!(
"tycho_do_collate_one_tick_account_msgs_exec_mean_time",
labels
)
.record(mean_account_msgs_exec_time);
metrics::histogram!(
"tycho_do_collate_one_tick_account_msgs_exec_max_time",
labels
)
.record(max_account_msgs_exec_time);
Ok(ExecutedGroup {
items,
ext_msgs_error_count,
ext_msgs_skipped,
touched_account_ids,
})
}
#[allow(clippy::vec_box)]
fn execute_subgroup(
account_id: HashBytes,
msgs: Vec<ParsedMessage>,
accounts_cache: &AccountsCache,
min_next_lt: u64,
config: &ParsedConfig,
params: &ExecutorParams,
) -> Result<ExecutedTransactions> {
let shard_account_stuff = accounts_cache.get_account_stuff(&account_id)?;
Self::execute_messages(shard_account_stuff, msgs, min_next_lt, config, params)
}
#[allow(clippy::too_many_arguments)]
fn save_subgroup_result(
&mut self,
ext_msgs_skipped: &mut u64,
max_account_msgs_exec_time: &mut Duration,
total_exec_time: &mut Duration,
ext_msgs_error_count: &mut u64,
group_max_vert_size: &mut usize,
group_gas: &mut u128,
items: &mut Vec<ExecutedTickItem>,
touched_account_ids: &mut FastHashSet<HashBytes>,
executed: ExecutedTransactions,
) -> Result<()> {
*ext_msgs_skipped += executed.ext_msgs_skipped;
*max_account_msgs_exec_time = (*max_account_msgs_exec_time).max(executed.exec_time);
*total_exec_time += executed.exec_time;
*group_max_vert_size = cmp::max(*group_max_vert_size, executed.transactions.len());
let mut has_executed = false;
for tx in executed.transactions {
match tx.result {
TransactionResult::Executed(executed) => {
self.min_next_lt = cmp::max(self.min_next_lt, executed.next_lt);
*group_gas = group_gas.saturating_add(executed.gas_used as u128);
items.push(ExecutedTickItem {
in_message: tx.in_message,
executed,
});
has_executed = true;
}
TransactionResult::Skipped(skipped) => {
tracing::trace!(
target: tracing_targets::EXEC_MANAGER,
account_addr = %executed.account_state.account_addr,
message_hash = %tx.in_message.cell().repr_hash(),
"skipped external message",
);
*group_gas = group_gas.saturating_add(skipped.gas_used as u128);
*ext_msgs_error_count += 1;
}
}
}
let account_addr = executed.account_state.account_addr;
if has_executed {
touched_account_ids.insert(account_addr);
}
self.accounts_cache
.add_account_stuff(executed.account_state);
Ok(())
}
#[allow(clippy::vec_box)]
fn execute_messages(
mut account_state: Box<ShardAccountStuff>,
msgs: Vec<ParsedMessage>,
min_next_lt: u64,
config: &ParsedConfig,
params: &ExecutorParams,
) -> Result<ExecutedTransactions> {
let mut ext_msgs_skipped = 0;
let timer = std::time::Instant::now();
let mut transactions = Vec::with_capacity(msgs.len());
let account_is_empty = account_state.is_empty()?;
for msg in msgs {
if msg.is_external() && account_is_empty {
ext_msgs_skipped += 1;
continue;
}
transactions.push(execute_ordinary_transaction_impl(
&mut account_state,
msg,
min_next_lt,
config,
params,
)?);
}
Ok(ExecutedTransactions {
account_state,
transactions,
exec_time: timer.elapsed(),
ext_msgs_skipped,
})
}
pub fn execute_ordinary_transaction(
&mut self,
mut account_stuff: Box<ShardAccountStuff>,
in_message: ParsedMessage,
) -> Result<ExecutedOrdinaryTransaction> {
let min_next_lt = self.min_next_lt;
let config = self.config.clone();
let params = self.params.clone();
let (account_stuff, executed) = execute_ordinary_transaction_impl(
&mut account_stuff,
in_message,
min_next_lt,
&config,
¶ms,
)
.map(|executed| (account_stuff, executed))?;
if let TransactionResult::Executed(tx) = &executed.result {
self.min_next_lt = cmp::max(min_next_lt, tx.next_lt);
}
self.accounts_cache.add_account_stuff(account_stuff);
Ok(executed)
}
pub fn execute_ticktock_transaction(
&mut self,
mut account_stuff: Box<ShardAccountStuff>,
tick_tock: TickTock,
) -> Result<TransactionResult> {
let min_next_lt = self.min_next_lt;
let config = self.config.clone();
let params = self.params.clone();
let executed = execute_ticktock_transaction(
&mut account_stuff,
tick_tock,
min_next_lt,
&config,
¶ms,
)?;
if let TransactionResult::Executed(tx) = &executed {
self.min_next_lt = cmp::max(min_next_lt, tx.next_lt);
}
self.accounts_cache.add_account_stuff(account_stuff);
Ok(executed)
}
}
struct AccountsCache {
workchain_id: i8,
shard_accounts: ShardAccounts,
items: FastHashMap<AccountId, Box<ShardAccountStuff>>,
}
impl AccountsCache {
fn take_account_stuff_if<F>(
&mut self,
account_id: &AccountId,
f: F,
) -> Result<Option<Box<ShardAccountStuff>>>
where
F: FnOnce(&ShardAccountStuff) -> bool,
{
match self.items.entry(*account_id) {
Entry::Occupied(entry) => {
if f(entry.get()) {
return Ok(Some(entry.remove()));
}
}
Entry::Vacant(entry) => {
if let Some((_, state)) = self.shard_accounts.get(account_id)? {
let account_stuff =
ShardAccountStuff::new(self.workchain_id, account_id, state)
.map(Box::new)?;
if f(&account_stuff) {
return Ok(Some(account_stuff));
}
entry.insert(account_stuff);
}
}
}
Ok(None)
}
fn get_account_stuff(&self, account_id: &AccountId) -> Result<Box<ShardAccountStuff>> {
if let Some(account) = self.items.get(account_id) {
Ok(account.clone())
} else if let Some((_depth, shard_account)) = self.shard_accounts.get(account_id)? {
ShardAccountStuff::new(self.workchain_id, account_id, shard_account).map(Box::new)
} else {
Ok(Box::new(ShardAccountStuff::new_empty(
self.workchain_id,
account_id,
)))
}
}
fn add_account_stuff(&mut self, account_stuff: Box<ShardAccountStuff>) {
tracing::trace!(
target: tracing_targets::EXEC_MANAGER,
account_addr = %account_stuff.account_addr,
"updating shard account"
);
self.items.insert(account_stuff.account_addr, account_stuff);
}
}
pub struct ExecutedGroup {
pub items: Vec<ExecutedTickItem>,
pub ext_msgs_error_count: u64,
pub ext_msgs_skipped: u64,
pub touched_account_ids: FastHashSet<HashBytes>,
}
pub struct ExecutedTickItem {
pub in_message: ParsedMessage,
pub executed: ExecutedTransaction,
}
pub struct ExecutedTransactions {
pub account_state: Box<ShardAccountStuff>,
pub transactions: Vec<ExecutedOrdinaryTransaction>,
pub exec_time: Duration,
pub ext_msgs_skipped: u64,
}
pub struct ExecutedOrdinaryTransaction {
pub result: TransactionResult,
pub in_message: ParsedMessage,
}
pub enum TransactionResult {
Executed(ExecutedTransaction),
Skipped(SkippedTransaction),
}
fn execute_ordinary_transaction_impl(
account_stuff: &mut ShardAccountStuff,
in_message: ParsedMessage,
min_lt: u64,
config: &ParsedConfig,
params: &ExecutorParams,
) -> Result<ExecutedOrdinaryTransaction> {
tracing::trace!(
target: tracing_targets::EXEC_MANAGER,
account_addr = %account_stuff.account_addr,
message_hash = %in_message.cell().repr_hash(),
message_kind = ?in_message.kind(),
"executing ordinary message",
);
let _histogram = HistogramGuard::begin("tycho_collator_execute_ordinary_time");
let is_external = matches!(in_message.info(), MsgInfo::ExtIn(_));
let mut inspector = ExecutorInspector::default();
let uncommited = Executor::new(params, config)
.with_min_lt(account_stuff.align_min_lt(min_lt))
.begin_ordinary_ext(
&account_stuff.make_std_addr(),
is_external,
in_message.cell().clone(),
&account_stuff.shard_account,
Some(&mut inspector),
);
let output = match uncommited {
Ok(uncommited) => uncommited.commit()?,
Err(TxError::Skipped) if is_external => {
return Ok(ExecutedOrdinaryTransaction {
result: TransactionResult::Skipped(SkippedTransaction {
gas_used: inspector.total_gas_used,
}),
in_message,
});
}
Err(e) => return Err(e.into()),
};
account_stuff.apply_transaction(
output.new_state,
output.new_state_meta,
output.transaction.clone(),
&output.transaction_meta,
inspector.public_libs_diff,
);
Ok(ExecutedOrdinaryTransaction {
result: TransactionResult::Executed(ExecutedTransaction {
transaction: output.transaction,
out_msgs: output.transaction_meta.out_msgs,
gas_used: inspector.total_gas_used,
next_lt: output.transaction_meta.next_lt,
burned: output.burned,
}),
in_message,
})
}
fn execute_ticktock_transaction(
account_stuff: &mut ShardAccountStuff,
kind: TickTock,
min_lt: u64,
config: &ParsedConfig,
params: &ExecutorParams,
) -> Result<TransactionResult> {
tracing::trace!(
target: tracing_targets::EXEC_MANAGER,
account_addr = %account_stuff.account_addr,
?kind,
"executing ticktock",
);
let _histogram = HistogramGuard::begin("tycho_collator_execute_ticktock_time");
let mut inspector = ExecutorInspector::default();
let uncommited = Executor::new(params, config)
.with_min_lt(account_stuff.align_min_lt(min_lt))
.begin_tick_tock_ext(
&account_stuff.make_std_addr(),
kind,
&account_stuff.shard_account,
Some(&mut inspector),
);
let output = match uncommited {
Ok(uncommited) => uncommited.commit()?,
Err(TxError::Skipped) => {
return Ok(TransactionResult::Skipped(SkippedTransaction {
gas_used: inspector.total_gas_used,
}));
}
Err(TxError::Fatal(e)) => return Err(e),
};
account_stuff.apply_transaction(
output.new_state,
output.new_state_meta,
output.transaction.clone(),
&output.transaction_meta,
inspector.public_libs_diff,
);
Ok(TransactionResult::Executed(ExecutedTransaction {
transaction: output.transaction,
out_msgs: output.transaction_meta.out_msgs,
gas_used: inspector.total_gas_used,
next_lt: output.transaction_meta.next_lt,
burned: output.burned,
}))
}