use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use rayon::iter::{IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
use tycho_block_util::archive::WithArchiveData;
use tycho_block_util::block::{BlockStuff, shard_ident_at_depth};
use tycho_block_util::config::BlockchainConfigExt;
use tycho_block_util::dict::{
RelaxedAugDict, merge_relaxed_aug_dicts, split_aug_dict, split_aug_dict_raw,
};
use tycho_block_util::queue::{QueuePartitionIdx, SerializedQueueDiff};
use tycho_block_util::state::ShardStateStuff;
use tycho_consensus::prelude::ConsensusConfigExt;
use tycho_core::global_config::ZerostateId;
use tycho_types::boc;
use tycho_types::cell::Lazy;
use tycho_types::merkle::*;
use tycho_types::models::{ShardIdent, *};
use tycho_types::prelude::*;
use tycho_util::FastHashMap;
use tycho_util::metrics::HistogramGuard;
use super::PrevData;
use super::phase::{Phase, PhaseState};
use crate::collator::debug_info::BlockDebugInfo;
use crate::collator::do_collate::work_units::FinalizeWu;
use crate::collator::error::{CollationCancelReason, CollatorError};
use crate::collator::execution_manager::MessagesExecutor;
use crate::collator::messages_reader::{FinalizedMessagesReader, MessagesReader};
use crate::collator::types::{
AccountExistence, BlockCollationData, BlockSerializerCache, ExecuteResult, FinalizeBlockResult,
FinalizeMessagesReaderResult, FinalizeMetrics, PreparedInMsg, PreparedOutMsg, PublicLibsDiff,
};
use crate::internal_queue::types::diff::DiffZone;
use crate::internal_queue::types::message::EnqueuedMessage;
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
use crate::types::processed_upto::{ProcessedUptoInfoExtension, ProcessedUptoInfoStuff};
use crate::types::{
BlockCandidate, CollationSessionInfo, CollatorConfig, McData, PrevMcData,
ShardDescriptionShortExt, ShardHashesExt,
};
use crate::utils::block::detect_top_processed_to_anchor;
pub struct FinalizeState {
pub executor: MessagesExecutor,
pub execute_result: ExecuteResult,
pub finalize_wu: FinalizeWu,
pub finalize_metrics: FinalizeMetrics,
}
impl PhaseState for FinalizeState {}
pub struct FinalizeBlockContext {
pub collation_session: Arc<CollationSessionInfo>,
pub wu_used_from_last_anchor: u64,
pub usage_tree: UsageTree,
pub serialized_diff: SerializedQueueDiff,
pub collator_config: Arc<CollatorConfig>,
pub processed_upto: ProcessedUptoInfoStuff,
pub diff_tail_len: u32,
pub block_serializer_cache: BlockSerializerCache,
pub zerostate_id: ZerostateId,
}
impl Phase<FinalizeState> {
pub fn finalize_messages_reader<'a>(
&mut self,
messages_reader: MessagesReader<'a, EnqueuedMessage>,
mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
zerostate_id: ZerostateId,
) -> Result<FinalizeMessagesReaderResult, CollatorError> {
let top_other_updated_shard_blocks_ids =
if self.state.collation_data.block_id_short.is_masterchain() {
self.state
.collation_data
.top_shard_blocks
.iter()
.map(|b| b.block_id)
.collect()
} else {
let mut top_other_updated_shard_blocks_ids: Vec<BlockId> = self
.state
.mc_data
.shards
.iter()
.filter(|(shard, descr)| {
descr.top_sc_block_updated && shard != &self.state.shard_id
})
.map(|(shard_ident, descr)| descr.get_block_id(*shard_ident))
.collect();
top_other_updated_shard_blocks_ids.push(self.state.mc_data.block_id);
top_other_updated_shard_blocks_ids
};
let mut other_updated_top_shard_diffs_info = FastHashMap::default();
for top_block_id in top_other_updated_shard_blocks_ids.iter() {
if top_block_id.seqno == 0 || self.state.mc_data.block_id.seqno <= zerostate_id.seqno {
continue;
}
let diff_info = mq_adapter
.get_diff_info(
&top_block_id.shard,
top_block_id.seqno,
DiffZone::Both,
)?
.ok_or_else(|| {
tracing::warn!(target: tracing_targets::COLLATOR,
"finalize_messages_reader: cannot get diff with stats from queue for block {}",
top_block_id.as_short_id(),
);
CollatorError::Cancelled(CollationCancelReason::DiffNotFoundInQueue(
top_block_id.as_short_id(),
))
})?;
other_updated_top_shard_diffs_info.insert(
top_block_id.shard,
mq_adapter.get_router_and_statistics(
&top_block_id.as_short_id(),
diff_info,
QueuePartitionIdx::ZERO,
)?,
);
}
let FinalizedMessagesReader {
has_unprocessed_messages,
queue_diff_with_msgs,
current_msgs_exec_params,
} = messages_reader.finalize(
self.extra.executor.min_next_lt(),
&other_updated_top_shard_diffs_info,
)?;
let queue_diff_messages_count = queue_diff_with_msgs.messages.len();
self.extra.finalize_wu.calculate_queue_diff_wu(
&self.state.collation_config.work_units_params.finalize,
queue_diff_messages_count as u64,
);
Ok(FinalizeMessagesReaderResult {
queue_diff_with_msgs,
current_msgs_exec_params,
has_unprocessed_messages,
})
}
pub fn finalize_block(
mut self,
ctx: FinalizeBlockContext,
) -> Result<(FinalizeBlockResult, ExecuteResult)> {
tracing::debug!(target: tracing_targets::COLLATOR, "finalize_block()");
let FinalizeBlockContext {
collation_session,
wu_used_from_last_anchor,
usage_tree,
serialized_diff: queue_diff,
collator_config,
processed_upto,
diff_tail_len,
block_serializer_cache,
zerostate_id,
} = ctx;
let accounts_split_depth = collator_config.accounts_split_depth;
let merkle_split_depth = collator_config.merkle_split_depth;
let shard = self.state.collation_data.block_id_short.shard;
let labels = &[("workchain", shard.workchain().to_string())];
let histogram =
HistogramGuard::begin_with_labels("tycho_collator_finalize_block_time_high", labels);
let executor = self.extra.executor;
let mut public_libraries = executor.executor_params().libraries.clone();
let is_masterchain = shard.is_masterchain();
let config_address = &self.state.mc_data.config.address;
let ref_by_mc_seqno = if is_masterchain {
self.state.collation_data.block_id_short.seqno
} else {
self.state.mc_data.block_id.seqno + 1
};
let mut processed_accounts_res = Ok(Default::default());
let mut build_accounts_elapsed = Duration::ZERO;
let mut in_msgs_res = Ok(Default::default());
let mut build_in_msgs_elapsed = Duration::ZERO;
let mut out_msgs_res = Ok(Default::default());
let mut build_out_msgs_elapsed = Duration::ZERO;
let histogram_build_accounts_and_messages = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_accounts_and_msgs_time_high",
labels,
);
rayon::scope(|s| {
s.spawn(|_| {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_in_msgs_time_high",
labels,
);
in_msgs_res = Self::build_in_msgs(&self.state.collation_data.in_msgs);
build_in_msgs_elapsed = histogram.finish();
});
s.spawn(|_| {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_out_msgs_time_high",
labels,
);
out_msgs_res = Self::build_out_msgs(&self.state.collation_data.out_msgs);
build_out_msgs_elapsed = histogram.finish();
});
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_accounts_time_high",
labels,
);
processed_accounts_res = Self::build_accounts(
executor,
config_address,
accounts_split_depth,
&mut public_libraries,
);
build_accounts_elapsed = histogram.finish();
});
let processed_accounts = processed_accounts_res?;
let in_msgs = in_msgs_res?;
let out_msgs = out_msgs_res?;
self.extra
.finalize_metrics
.build_accounts_and_messages_in_parallel_elased =
histogram_build_accounts_and_messages.finish();
self.extra.finalize_metrics.update_shard_accounts_elapsed =
processed_accounts.update_shard_accounts_elapsed;
self.extra.finalize_metrics.build_accounts_blocks_elapsed =
processed_accounts.build_accounts_blocks_elapsed;
self.extra.finalize_metrics.build_accounts_elapsed = build_accounts_elapsed;
self.extra.finalize_metrics.build_in_msgs_elapsed = build_in_msgs_elapsed;
self.extra.finalize_metrics.build_out_msgs_elapsed = build_out_msgs_elapsed;
let value_flow = self.state.collation_data.finalize_value_flow(
&processed_accounts.account_blocks,
&processed_accounts.shard_accounts,
&in_msgs,
&out_msgs,
&self.state.mc_data.config,
)?;
if collator_config.check_value_flow {
Self::check_value_flow(
&value_flow,
&self.state.collation_data,
&self.state.mc_data.config,
&self.state.prev_shard_data,
&in_msgs,
&out_msgs,
&processed_accounts,
)?;
}
let (mc_state_extra, master_ref) = if is_masterchain {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_mc_state_extra_time_high",
labels,
);
let (processed_to_anchor, _) = processed_upto.get_min_externals_processed_to()?;
let prev_state = &self.state.prev_shard_data.observable_states()[0];
let prev_processed_to_anchor = self
.state
.prev_shard_data
.processed_upto()
.get_min_externals_processed_to()?
.0;
let config_params =
processed_accounts
.new_config_params
.map(|params| BlockchainConfig {
address: *config_address,
params,
});
let (extra, min_ref_mc_seqno) = Self::create_mc_state_extra(
&mut self.state.collation_data,
processed_to_anchor,
config_params,
prev_state,
prev_processed_to_anchor,
collator_config,
zerostate_id,
)?;
self.state
.collation_data
.update_ref_min_mc_seqno(min_ref_mc_seqno);
self.extra.finalize_metrics.build_mc_state_extra_elapsed = histogram.finish();
(Some(extra), None)
} else {
(None, Some(self.state.mc_data.make_block_ref()))
};
let version = if cfg!(tycho_unstable)
&& self
.state
.collation_data
.block_id_short
.seqno
.is_multiple_of(3)
{
let val = std::env::var("HACK_MISMATCH_BLOCK_VER").unwrap_or_default();
val.parse::<u32>().unwrap_or(0)
} else {
0
};
let mut new_block_info = BlockInfo {
version,
key_block: matches!(&mc_state_extra, Some(extra) if extra.after_key_block),
shard,
seqno: self.state.collation_data.block_id_short.seqno,
gen_utime: self.state.collation_data.gen_utime,
gen_utime_ms: self.state.collation_data.gen_utime_ms,
start_lt: self.state.collation_data.start_lt,
end_lt: self.state.collation_data.next_lt,
gen_validator_list_hash_short: collation_session.collators().short_hash,
gen_catchain_seqno: collation_session.seqno(),
min_ref_mc_seqno: self.state.collation_data.min_ref_mc_seqno,
prev_key_block_seqno: self.state.mc_data.prev_key_block_seqno,
master_ref: master_ref.as_ref().map(Lazy::new).transpose()?,
..Default::default()
};
let prev_ref = self.state.prev_shard_data.get_blocks_ref()?;
new_block_info.set_prev_ref(&prev_ref);
let bc_global_version = self.state.mc_data.config.get_global_version()?;
if bc_global_version
.capabilities
.contains(GlobalCapability::CapReportVersion)
{
new_block_info.set_gen_software(Some(bc_global_version));
}
let new_state_root;
let total_validator_fees;
let (state_update, new_observable_state) = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_state_update_time_high",
labels,
);
let prev_state = &self.state.prev_shard_data.pure_states()[0];
let mut shard_accounts_count = prev_state.state().underload_history;
if shard_accounts_count == 0 {
shard_accounts_count =
prev_state.state().accounts.load()?.raw_keys().count() as u64;
}
shard_accounts_count = shard_accounts_count
.saturating_add(processed_accounts.added_accounts_count)
.saturating_sub(processed_accounts.removed_accounts_count);
self.state.collation_data.shard_accounts_count = shard_accounts_count;
self.state.collation_data.updated_accounts_count =
processed_accounts.updated_accounts_count;
self.state.collation_data.added_accounts_count =
processed_accounts.added_accounts_count;
self.state.collation_data.removed_accounts_count =
processed_accounts.removed_accounts_count;
let updated_accounts_count = self.state.collation_data.updated_accounts_count;
let in_msgs_len = self.state.collation_data.in_msgs.len() as u64;
let out_msgs_len = self.state.collation_data.out_msgs.len() as u64;
self.extra.finalize_wu.calculate_finalize_block_wu(
&self.state.collation_config.work_units_params.finalize,
&self.state.collation_config.work_units_params.execute,
shard_accounts_count,
updated_accounts_count,
in_msgs_len,
out_msgs_len,
);
tracing::debug!(target: tracing_targets::COLLATOR,
"finalize_block_wu: {}, state_accounts_count: {}, \
updated_accounts_count: {}, in_msgs: {}, out_msgs: {} ",
self.extra.finalize_wu.finalize_block_wu(),
shard_accounts_count, updated_accounts_count,
in_msgs_len, out_msgs_len,
);
self.state.do_collate_wu.calculate_resume_collation_wu(
&self.state.collation_config.work_units_params.finalize,
&self.state.collation_config.work_units_params.execute,
shard_accounts_count,
updated_accounts_count,
&self.state.mc_data,
&shard,
);
let blocks_count_between_masters =
self.state.mc_data.get_blocks_count_between_masters(&shard);
tracing::debug!(target: tracing_targets::COLLATOR,
"resume_collation_wu: {}, state_accounts_count: {}, \
blocks_count_between_masters: {} ",
self.state.do_collate_wu.resume_collation_wu,
shard_accounts_count,
blocks_count_between_masters,
);
if !shard.is_masterchain() && self.state.is_first_block_after_prev_master {
metrics::gauge!("tycho_shard_blocks_count_in_last_master_block", labels)
.set(blocks_count_between_masters as f64);
}
let mut new_wu_used_from_last_anchor = wu_used_from_last_anchor
.saturating_add(self.extra.execute_result.prepare_msg_groups_wu.total_wu())
.saturating_add(self.extra.execute_result.execute_wu.total_wu())
.saturating_add(self.extra.finalize_wu.total_wu());
if self.state.is_first_block_after_prev_master {
new_wu_used_from_last_anchor = new_wu_used_from_last_anchor
.saturating_add(self.state.do_collate_wu.resume_collation_wu);
}
let max_consensus_lag_rounds = self
.state
.mc_data
.config
.get_consensus_config()?
.max_consensus_lag_rounds
.get() as u64;
let wu_used_to_import_next_anchor =
self.state.collation_config.wu_used_to_import_next_anchor;
let max_wu_used_limit = max_consensus_lag_rounds
.saturating_div(4)
.saturating_mul(wu_used_to_import_next_anchor);
tracing::info!(target: tracing_targets::COLLATOR,
"wu_used_from_last_anchor update: old={}, new={}, max_limit={}, \
read_msg_groups_wu_total={}, execute_groups_wu_total={}, finalize_wu_total={}",
wu_used_from_last_anchor,
new_wu_used_from_last_anchor,
max_wu_used_limit,
self.extra.execute_result.prepare_msg_groups_wu.total_wu(),
self.extra.execute_result.execute_wu.total_wu(),
self.extra.finalize_wu.total_wu(),
);
new_wu_used_from_last_anchor = new_wu_used_from_last_anchor.min(max_wu_used_limit);
let mut new_observable_state = Box::new(ShardStateUnsplit {
global_id: self.state.mc_data.global_id,
shard_ident: new_block_info.shard,
seqno: new_block_info.seqno,
vert_seqno: 0, gen_utime: new_block_info.gen_utime,
gen_utime_ms: new_block_info.gen_utime_ms,
gen_lt: new_block_info.end_lt,
min_ref_mc_seqno: new_block_info.min_ref_mc_seqno,
processed_upto: Lazy::new(&processed_upto.clone().try_into()?)?,
before_split: new_block_info.before_split, accounts: Lazy::new(&processed_accounts.shard_accounts)?,
overload_history: new_wu_used_from_last_anchor,
underload_history: shard_accounts_count,
total_balance: value_flow.to_next_block.clone(),
total_validator_fees: self.state.prev_shard_data.total_validator_fees().clone(),
libraries: Dict::new(),
master_ref,
custom: mc_state_extra.as_ref().map(Lazy::new).transpose()?,
});
new_observable_state
.total_validator_fees
.try_add_assign(&value_flow.fees_collected)?;
new_observable_state
.total_validator_fees
.try_sub_assign(&value_flow.recovered)?;
if is_masterchain {
new_observable_state.libraries = public_libraries.clone();
}
new_state_root = CellBuilder::build_from(&new_observable_state)?;
let old_split_at = split_aug_dict_raw(
self.state.prev_shard_data.observable_accounts().clone(),
merkle_split_depth,
)?
.into_keys()
.collect::<ahash::HashSet<_>>();
let new_split_at =
split_aug_dict_raw(new_observable_state.accounts.load()?, merkle_split_depth)?
.into_keys()
.collect::<ahash::HashSet<_>>();
total_validator_fees = new_observable_state.total_validator_fees.clone();
let merkle_update = create_merkle_update(
&shard,
self.state.prev_shard_data.pure_state_root(),
&new_state_root,
usage_tree,
old_split_at,
new_split_at,
)?;
self.extra.finalize_metrics.build_state_update_elapsed = histogram.finish();
(merkle_update, new_observable_state)
};
let (new_block, new_block_extra, new_mc_block_extra) = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_block_time_high",
labels,
);
let mut new_block_extra = BlockExtra {
in_msg_description: Lazy::new(&in_msgs)?,
out_msg_description: Lazy::new(&out_msgs)?,
account_blocks: Lazy::new(&processed_accounts.account_blocks)?,
rand_seed: self.state.collation_data.rand_seed,
created_by: self.state.collation_data.created_by,
..Default::default()
};
let new_mc_block_extra = if let Some(mc_state_extra) = &mc_state_extra {
let new_mc_block_extra = McBlockExtra {
shards: mc_state_extra.shards.clone(),
fees: self.state.collation_data.shard_fees.clone(),
prev_block_signatures: Default::default(),
mint_msg: self
.state
.collation_data
.mint_msg
.as_ref()
.map(Lazy::new)
.transpose()?,
recover_create_msg: self
.state
.collation_data
.recover_create_msg
.as_ref()
.map(Lazy::new)
.transpose()?,
config: if mc_state_extra.after_key_block {
Some(mc_state_extra.config.clone())
} else {
None
},
};
new_block_extra.custom = Some(Lazy::new(&new_mc_block_extra)?);
Some(new_mc_block_extra)
} else {
None
};
self.state.collation_data.diff_tail_len = diff_tail_len;
let block = Block {
global_id: self.state.mc_data.global_id,
info: Lazy::new(&new_block_info)?,
value_flow: Lazy::new(&value_flow)?,
state_update: Lazy::new(&state_update)?,
out_msg_queue_updates: OutMsgQueueUpdates {
diff_hash: *queue_diff.hash(),
tail_len: diff_tail_len,
},
extra: Lazy::new(&new_block_extra)?,
};
let root = CellBuilder::build_from(&block)?;
let mut data = Vec::new();
{
let header = boc::ser::BocHeader::<ahash::RandomState>::with_root_and_cache(
root.as_ref(),
block_serializer_cache.take_boc_header_cache(),
);
header.encode_rayon(&mut data);
let cache = header.into_cache();
let rev_indices_capacity = cache.rev_indices_capacity();
let rev_cells_capacity = cache.rev_cells_capacity();
block_serializer_cache.set_boc_header_cache(cache);
metrics::gauge!("tycho_collator_boc_cache_rev_indices_capacity", labels)
.set(rev_indices_capacity as f64);
metrics::gauge!("tycho_collator_boc_cache_rev_cells_capacity", labels)
.set(rev_cells_capacity as f64);
};
let block_id = BlockId {
shard,
seqno: self.state.collation_data.block_id_short.seqno,
root_hash: *root.repr_hash(),
file_hash: Boc::file_hash_blake(&data),
};
let block = BlockStuff::from_block_and_root(&block_id, block, root, data.len());
self.extra.finalize_metrics.build_block_elapsed = histogram.finish();
(
WithArchiveData::new(block, data),
new_block_extra,
new_mc_block_extra,
)
};
let new_block_id = *new_block.id();
tracing::info!(target: tracing_targets::COLLATOR,
"collated_block_info: {:?}",
BlockDebugInfo {
block_id: &new_block_id,
block_info: &new_block_info,
prev_ref: &prev_ref,
state: &new_observable_state,
processed_upto: &processed_upto,
out_msg_queue_updates: &new_block.block().out_msg_queue_updates,
value_flow: &value_flow,
mc_top_shards: self.state.collation_data.shards(),
mc_state_extra: mc_state_extra.as_ref(),
merkle_update_hash: new_block.block().state_update.repr_hash(),
merkle_update: &state_update,
block_extra: &new_block_extra,
mc_block_extra: new_mc_block_extra.as_ref(),
},
);
let processed_to_anchor_id = processed_upto.get_min_externals_processed_to()?.0;
let new_mc_data = match mc_state_extra {
None => None,
Some(extra) => {
let prev_key_block_seqno = if extra.after_key_block {
new_block_id.seqno
} else if let Some(block_ref) = &extra.last_key_block {
block_ref.seqno
} else {
0
};
let shards = extra.shards.as_vec()?;
let top_processed_to_anchor = detect_top_processed_to_anchor(
shards.iter().map(|(_, d)| *d),
processed_to_anchor_id,
);
Some(Arc::new(McData {
global_id: new_block.as_ref().global_id,
block_id: *new_block.id(),
prev_key_block_seqno,
gen_lt: new_block_info.end_lt,
gen_chain_time: self.state.collation_data.get_gen_chain_time(),
libraries: public_libraries,
total_validator_fees,
global_balance: extra.global_balance.clone(),
shards,
config: extra.config,
validator_info: extra.validator_info,
consensus_info: extra.consensus_info,
processed_upto: processed_upto.clone(),
top_processed_to_anchor,
ref_mc_state_handle: self.state.prev_shard_data.ref_mc_state_handle().clone(),
shards_processed_to_by_partitions: self
.state
.collation_data
.mc_shards_processed_to_by_partitions
.clone(),
prev_mc_data: Some(PrevMcData {
shards: self.state.mc_data.shards.clone(),
}),
}))
}
};
let block_candidate = Box::new(BlockCandidate {
ref_by_mc_seqno,
block: new_block,
is_key_block: new_block_info.key_block,
consensus_config_changed: self.state.collation_data.consensus_config_changed,
prev_blocks_ids: self.state.prev_shard_data.blocks_ids().clone(),
top_shard_blocks_ids: self
.state
.collation_data
.top_shard_blocks
.iter()
.map(|b| b.block_id)
.collect(),
collated_file_hash: HashBytes::ZERO,
chain_time: self.state.collation_data.get_gen_chain_time(),
processed_to_anchor_id,
value_flow,
created_by: self.state.collation_data.created_by,
queue_diff_aug: queue_diff.build(&new_block_id),
consensus_info: new_mc_data.as_ref().map_or_else(
|| self.state.mc_data.consensus_info,
|mcd| mcd.consensus_info,
),
processed_upto,
});
self.extra.finalize_metrics.finalize_block_elapsed = histogram.finish();
Ok((
FinalizeBlockResult {
collation_data: self.state.collation_data,
block_candidate,
mc_data: new_mc_data,
state_update,
new_state_root,
new_observable_state,
finalize_wu: self.extra.finalize_wu,
finalize_metrics: self.extra.finalize_metrics,
do_collate_wu: self.state.do_collate_wu,
old_mc_data: self.state.mc_data,
collation_config: self.state.collation_config,
},
self.extra.execute_result,
))
}
fn create_mc_state_extra(
collation_data: &mut BlockCollationData,
processed_to_anchor: u32,
config_params: Option<BlockchainConfig>,
prev_state: &ShardStateStuff,
prev_processed_to_anchor: u32,
collator_config: Arc<CollatorConfig>,
zerostate_id: ZerostateId,
) -> Result<(McStateExtra, u32)> {
let prev_state_extra = prev_state.state_extra()?;
let prev_config = &prev_state_extra.config;
let (config, mut is_key_block) = if let Some(new_config) = config_params {
let is_key_block = &new_config != prev_config;
if is_key_block && collator_config.validate_config {
new_config
.validate_params()
.context("invalid blockchain config")?;
}
(new_config, is_key_block)
} else {
(prev_config.clone(), false)
};
let mut consensus_info = prev_state_extra.consensus_info;
if let Some(mp_cfg_override) = &collation_data.mempool_config_override
&& (mp_cfg_override.genesis_info).overrides(&consensus_info.genesis_info)
{
consensus_info.genesis_info = mp_cfg_override.genesis_info;
is_key_block = true;
}
let collation_config = config.get_collation_config()?;
let workchains = config.get_workchains()?;
let update_shard_cc = is_key_block;
let min_ref_mc_seqno =
Self::update_shard_config(collation_data, &workchains, update_shard_cc)?;
let shards_iter = collation_data
.get_shards()?
.iter()
.map(|(k, v)| (k, v.as_ref()));
let shards = ShardHashes::from_shards(shards_iter)?;
let mut validator_info = None;
if is_key_block {
let prev_vset = prev_config.get_current_validator_set_raw()?;
let current_vset = config.get_current_validator_set_raw()?;
let prev_shuffle_mc_validators =
prev_config.get_collation_config()?.shuffle_mc_validators;
let prev_consensus_config = prev_config.get_consensus_config()?;
let is_consensus_config_changed =
prev_consensus_config != config.get_consensus_config()?;
collation_data.consensus_config_changed = Some(is_consensus_config_changed);
let is_curr_switch_applied =
consensus_info.vset_switch_round <= prev_processed_to_anchor;
let next_session_start_round = if consensus_info != prev_state_extra.consensus_info {
consensus_info.genesis_info.start_round + 1
} else if is_consensus_config_changed {
consensus_info.genesis_info = GenesisInfo {
start_round: processed_to_anchor,
genesis_millis: collation_data.get_gen_chain_time() + 1,
};
consensus_info.genesis_info.start_round + 1
} else {
let last_round_to_create = prev_processed_to_anchor
+ prev_consensus_config.max_consensus_lag_rounds.get() as u32;
if !is_curr_switch_applied {
(last_round_to_create + 1).max(consensus_info.vset_switch_round)
} else if consensus_info.vset_switch_round > consensus_info.genesis_info.start_round
{
let full_history_round =
consensus_info.vset_switch_round + prev_consensus_config.max_total_rounds();
last_round_to_create.max(full_history_round) + 1
} else {
last_round_to_create + 1
}
};
if current_vset.repr_hash() != prev_vset.repr_hash()
|| collation_config.shuffle_mc_validators != prev_shuffle_mc_validators
{
if is_curr_switch_applied {
consensus_info.prev_shuffle_mc_validators = prev_shuffle_mc_validators;
consensus_info.prev_vset_switch_round = consensus_info.vset_switch_round;
}
consensus_info.vset_switch_round = next_session_start_round;
let current_vset = current_vset.parse::<ValidatorSet>()?;
let (_, validator_list_hash_short) = current_vset
.compute_mc_subset(
next_session_start_round,
collation_config.shuffle_mc_validators,
)
.ok_or_else(|| {
anyhow!(
"Error calculating subset of validators for next session \
(shard_id = {}, session_seqno = {})",
ShardIdent::MASTERCHAIN,
next_session_start_round,
)
})?;
validator_info = Some(ValidatorInfo {
validator_list_hash_short,
catchain_seqno: next_session_start_round,
nx_cc_updated: true,
});
}
}
let validator_info = validator_info.unwrap_or(ValidatorInfo {
nx_cc_updated: false,
..prev_state_extra.validator_info
});
let prev_is_key_block = collation_data.block_id_short.seqno == zerostate_id.seqno + 1 || prev_state_extra.after_key_block;
let mut prev_blocks = prev_state_extra.prev_blocks.clone();
let prev_blk_ref = BlockRef {
end_lt: prev_state.state().gen_lt,
seqno: prev_state.block_id().seqno,
root_hash: prev_state.block_id().root_hash,
file_hash: prev_state.block_id().file_hash,
};
if prev_is_key_block
|| !prev_state_extra
.config
.get_global_version()?
.capabilities
.contains(GlobalCapability::CapOmitMasterBlockHistory)
{
prev_blocks.set(
prev_state.block_id().seqno,
KeyMaxLt {
has_key_block: prev_is_key_block,
max_end_lt: prev_state.state().gen_lt,
},
&KeyBlockRef {
is_key_block: prev_is_key_block,
block_ref: prev_blk_ref.clone(),
},
)?;
}
let last_key_block = if prev_state_extra.after_key_block {
Some(prev_blk_ref)
} else {
prev_state_extra.last_key_block.clone()
};
let mut global_balance = prev_state_extra.global_balance.clone();
global_balance.try_add_assign(&collation_data.value_flow.created)?;
global_balance.try_add_assign(&collation_data.value_flow.minted)?;
global_balance.try_sub_assign(&collation_data.value_flow.burned)?;
global_balance.try_add_assign(&collation_data.shard_fees.root_extra().create)?;
#[cfg(not(feature = "block-creator-stats"))]
let block_create_stats = None;
#[cfg(feature = "block-creator-stats")]
let block_create_stats = if prev_state_extra
.config
.get_global_version()?
.capabilities
.contains(GlobalCapability::CapCreateStatsEnabled)
{
let mut stats = prev_state_extra
.block_create_stats
.clone()
.unwrap_or_default();
Self::update_block_creator_stats(collation_data, &mut stats)?;
Some(stats)
} else {
None
};
let mc_state_extra = McStateExtra {
shards,
config,
validator_info,
consensus_info,
prev_blocks,
after_key_block: is_key_block,
last_key_block,
block_create_stats,
global_balance,
};
Ok((mc_state_extra, min_ref_mc_seqno))
}
fn update_shard_config(
collation_data: &mut BlockCollationData,
_wc_set: &Dict<i32, WorkchainDescription>,
_update_cc: bool,
) -> Result<u32> {
let mut min_ref_mc_seqno = u32::MAX;
for shard_descr in collation_data.get_shards_mut()?.values_mut() {
min_ref_mc_seqno = std::cmp::min(min_ref_mc_seqno, shard_descr.min_ref_mc_seqno);
}
Ok(min_ref_mc_seqno)
}
fn build_accounts(
executor: MessagesExecutor,
config_address: &HashBytes,
split_depth: u8,
public_libraries: &mut Dict<HashBytes, LibDescr>,
) -> Result<ProcessedAccounts> {
let shard_ident = executor.shard_id();
let labels = [("workchain", shard_ident.workchain().to_string())];
let is_masterchain = shard_ident.is_masterchain();
let workchain = shard_ident.workchain();
let mut new_config_params = None;
let (updated_accounts, shard_accounts) = executor.into_accounts_cache_raw();
let split_depth = if is_masterchain { 0 } else { split_depth };
let shard_count = 2usize.pow(split_depth as _);
let mut account_modifications = FastHashMap::<ShardIdent, Vec<_>>::with_capacity_and_hasher(
shard_count,
Default::default(),
);
let mut public_libraries_diff = PublicLibsDiff::new(public_libraries.clone());
let mut added_accounts_count = 0;
let mut removed_accounts_count = 0;
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_update_shard_accounts_time_high",
&labels,
);
let updated_accounts = updated_accounts
.filter(|account| !account.transactions.is_empty())
.map(|mut updated_account| {
if is_masterchain {
if &updated_account.account_addr == config_address
&& let Some(Account {
state:
AccountState::Active(StateInit {
data: Some(data), ..
}),
..
}) = updated_account
.shard_account
.load_account()
.context("failed to load account")?
{
new_config_params = Some(
data.parse::<BlockchainConfigParams>()
.context("failed to parse config params")?,
);
}
public_libraries_diff
.merge(
&updated_account.account_addr,
std::mem::take(&mut updated_account.public_libs_diff),
)
.context("failed to add public libraries diff")?;
}
if updated_account.exists == AccountExistence::Created {
added_accounts_count += 1;
} else if updated_account.exists == AccountExistence::Removed {
removed_accounts_count += 1;
}
let op = match updated_account.exists {
AccountExistence::Exists | AccountExistence::Created => {
let extra = DepthBalanceInfo {
split_depth: 0,
balance: updated_account.balance.clone(),
};
let value = updated_account.shard_account.clone();
Some((extra, value))
}
AccountExistence::NotExists | AccountExistence::Removed => None,
};
let shard_ident =
shard_ident_at_depth(workchain, &updated_account.account_addr, split_depth);
account_modifications
.entry(shard_ident)
.or_default()
.push((updated_account.account_addr, op));
Ok(updated_account)
})
.collect::<Result<Vec<_>>>()?;
account_modifications
.par_iter_mut()
.for_each(|(_, items)| items.sort_by(|(a, _), (b, _)| a.cmp(b)));
let shard_accounts = {
let split_shard_accounts = split_aug_dict(workchain, shard_accounts, split_depth)?;
let split_shard_accounts_with_updates = split_shard_accounts
.into_iter()
.map(|(shard, shard_accounts)| {
let account_updates = account_modifications.remove(&shard);
(shard, (shard_accounts, account_updates))
})
.collect::<BTreeMap<_, _>>();
let updated_shard_accounts = split_shard_accounts_with_updates
.into_par_iter()
.map(|(shard, (shard_accounts, account_updates))| {
let mut shard_accounts = RelaxedAugDict::from_full(&shard_accounts);
if let Some(updates) = account_updates {
shard_accounts
.modify_with_sorted_iter(updates)
.context("failed to modify accounts dict")?;
}
Ok((shard, shard_accounts))
})
.collect::<Result<BTreeMap<_, _>>>()?;
merge_relaxed_aug_dicts(updated_shard_accounts)?
};
let update_shard_accounts_elapsed = histogram.finish();
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_accounts_blocks_time_high",
&labels,
);
let account_blocks = updated_accounts
.into_par_iter()
.map(|updated_account| {
let transactions = AugDict::try_from_btree(&updated_account.transactions)
.context("failed to build account block transactions")?;
Ok::<_, anyhow::Error>(AccountBlock {
state_update: updated_account.build_hash_update(),
account: updated_account.account_addr,
transactions,
})
})
.collect_vec_list();
let account_blocks = account_blocks
.into_iter()
.flat_map(move |chunk| {
chunk.into_iter().map(|res| {
let account_block = res?;
Ok((account_block.account, account_block))
})
})
.collect::<Result<BTreeMap<_, _>>>()?;
let updated_accounts_count = account_blocks.len() as u64;
let account_blocks = RelaxedAugDict::try_from_sorted_iter_any(
account_blocks
.into_iter()
.map(|(k, v)| (k, v.transactions.root_extra().clone(), v)),
)?;
let build_accounts_blocks_elapsed = histogram.finish();
*public_libraries = public_libraries_diff
.finalize()
.context("failed to finalize public libraries dict")?;
Ok(ProcessedAccounts {
account_blocks: account_blocks.build()?,
shard_accounts,
new_config_params,
updated_accounts_count,
added_accounts_count,
removed_accounts_count,
update_shard_accounts_elapsed,
build_accounts_blocks_elapsed,
})
}
fn build_in_msgs(items: &BTreeMap<HashBytes, PreparedInMsg>) -> Result<InMsgDescr> {
RelaxedAugDict::try_from_sorted_iter_lazy(
items
.iter()
.map(|(msg_id, msg)| (msg_id, &msg.import_fees, &msg.in_msg)),
)?
.build()
.map_err(Into::into)
}
fn build_out_msgs(items: &BTreeMap<HashBytes, PreparedOutMsg>) -> Result<OutMsgDescr> {
RelaxedAugDict::try_from_sorted_iter_lazy(
items
.iter()
.map(|(msg_id, msg)| (msg_id, &msg.exported_value, &msg.out_msg)),
)?
.build()
.map_err(Into::into)
}
#[cfg(feature = "block-creator-stats")]
fn update_block_creator_stats(
collation_data: &BlockCollationData,
block_create_stats: &mut Dict<HashBytes, CreatorStats>,
) -> Result<()> {
let mut mc_updated = false;
for (creator, count) in &collation_data.block_create_count {
let shard_scaled = count << 32;
let total_mc = if collation_data.created_by == *creator {
mc_updated = true;
1
} else {
0
};
block_create_stats.set(creator, CreatorStats {
mc_blocks: BlockCounters {
updated_at: collation_data.gen_utime,
total: total_mc,
cnt2048: total_mc,
cnt65536: total_mc,
},
shard_blocks: BlockCounters {
updated_at: collation_data.gen_utime,
total: *count,
cnt2048: shard_scaled,
cnt65536: shard_scaled,
},
})?;
}
if !mc_updated {
block_create_stats.set(collation_data.created_by, CreatorStats {
mc_blocks: BlockCounters {
updated_at: collation_data.gen_utime,
total: 1,
cnt2048: 1,
cnt65536: 1,
},
shard_blocks: BlockCounters {
updated_at: collation_data.gen_utime,
total: 0,
cnt2048: 0,
cnt65536: 0,
},
})?;
}
let default_shard_blocks_count = collation_data.block_create_count.values().sum();
block_create_stats.set(HashBytes::default(), CreatorStats {
mc_blocks: BlockCounters {
updated_at: collation_data.gen_utime,
total: 1,
cnt2048: 1,
cnt65536: 1,
},
shard_blocks: BlockCounters {
updated_at: collation_data.gen_utime,
total: default_shard_blocks_count,
cnt2048: default_shard_blocks_count << 32,
cnt65536: default_shard_blocks_count << 32,
},
})?;
Ok(())
}
fn check_value_flow(
value_flow: &ValueFlow,
collation_data: &BlockCollationData,
config: &BlockchainConfig,
prev_shard_data: &PrevData,
in_msgs: &InMsgDescr,
out_msgs: &OutMsgDescr,
processed_accounts: &ProcessedAccounts,
) -> Result<()> {
let is_masterchain = collation_data.block_id_short.is_masterchain();
if value_flow.recovered.is_zero() {
anyhow::ensure!(
collation_data.recover_create_msg.is_none(),
"ValueFlow of block {} has no recovered fees, \
but there is a recovery InMsg",
collation_data.block_id_short
);
} else {
anyhow::ensure!(
is_masterchain,
"ValueFlow of block {} is invalid \
(non-zero recovered value in a non-masterchain block)",
collation_data.block_id_short
);
anyhow::ensure!(
collation_data.recover_create_msg.is_some(),
"ValueFlow of block {} has recovered fees, \
but there is no recovery InMsg",
collation_data.block_id_short
);
}
if value_flow.minted.is_zero() {
anyhow::ensure!(
collation_data.mint_msg.is_none(),
"ValueFlow of block {} has a zero minted value, \
but there is a mint InMsg",
collation_data.block_id_short
);
} else {
anyhow::ensure!(
is_masterchain,
"ValueFlow of block {} is invalid \
(non-zero minted value in a non-masterchain block)",
collation_data.block_id_short
);
anyhow::ensure!(
collation_data.mint_msg.is_some(),
"ValueFlow of block {} has a non-zero minted value, \
but there is no mint InMsg",
collation_data.block_id_short
);
}
anyhow::ensure!(
is_masterchain || value_flow.burned.is_zero(),
"ValueFlow of block {} is invalid \
(non-zero burned in a non-masterchain block)",
collation_data.block_id_short
);
let mut created = config
.get_block_creation_reward(is_masterchain)
.unwrap_or_default();
created >>= collation_data.block_id_short.shard.prefix_len() as u8;
anyhow::ensure!(
value_flow.created == CurrencyCollection::from(created),
"ValueFlow of block {} declares block creation reward {}, \
but the current configuration expects it to be {}",
collation_data.block_id_short,
value_flow.created.tokens,
created
);
anyhow::ensure!(
is_masterchain || value_flow.fees_imported.is_zero(),
"ValueFlow of block {} is invalid \
(non-zero fees_imported in a non-masterchain block)",
collation_data.block_id_short
);
let from_prev_block = &prev_shard_data.observable_accounts().root_extra().balance;
anyhow::ensure!(
value_flow.from_prev_block == *from_prev_block,
"ValueFlow for {} declares from_prev_block={}, \
but the total balance present in the previous state is {}",
collation_data.block_id_short,
value_flow.from_prev_block.tokens,
from_prev_block.tokens
);
let to_next_block = &processed_accounts.shard_accounts.root_extra().balance;
anyhow::ensure!(
value_flow.to_next_block == *to_next_block,
"ValueFlow for {} declares to_next_block={}, \
but the total balance present in the new state is {}",
collation_data.block_id_short,
value_flow.to_next_block.tokens,
to_next_block.tokens
);
let imported = &in_msgs.root_extra().value_imported;
anyhow::ensure!(
value_flow.imported == *imported,
"ValueFlow for {} declares imported={}, \
but the total inbound messages value from InMsgDescr is {}",
collation_data.block_id_short,
value_flow.imported.tokens,
imported.tokens
);
let exported = out_msgs.root_extra();
anyhow::ensure!(
value_flow.exported == *exported,
"ValueFlow for {} declares exported={}, \
but the total outbound messages value from OutMsgDescr is {}",
collation_data.block_id_short,
value_flow.exported.tokens,
exported.tokens
);
let transaction_fees = processed_accounts.account_blocks.root_extra();
let in_msgs_fees = in_msgs.root_extra().fees_collected;
let mut fees_collected = transaction_fees.clone();
fees_collected.try_add_assign_tokens(in_msgs_fees)?;
fees_collected.try_add_assign_tokens(value_flow.fees_imported.tokens)?;
fees_collected.try_add_assign_tokens(value_flow.created.tokens)?;
anyhow::ensure!(
value_flow.fees_collected == fees_collected,
"ValueFlow for {} declares fees_collected={}, \
but the computed amount is {}, where \
transaction_fees={}, in_msgs_fees={}, fees_imported={} and created={}",
collation_data.block_id_short,
value_flow.fees_collected.tokens,
fees_collected.tokens,
transaction_fees.tokens,
in_msgs_fees,
value_flow.fees_imported.tokens,
value_flow.created.tokens,
);
if !value_flow.validate()? {
anyhow::bail!(
"ValueFlow for {} \
is invalid (incoming flow != outgoing flow): {:?}",
collation_data.block_id_short,
value_flow,
)
}
Ok(())
}
}
#[derive(Default)]
struct ProcessedAccounts {
account_blocks: AccountBlocks,
shard_accounts: ShardAccounts,
new_config_params: Option<BlockchainConfigParams>,
updated_accounts_count: u64,
added_accounts_count: u64,
removed_accounts_count: u64,
update_shard_accounts_elapsed: Duration,
build_accounts_blocks_elapsed: Duration,
}
fn create_merkle_update(
shard_id: &ShardIdent,
old_state_root: &Cell,
new_state_root: &Cell,
usage_tree: UsageTree,
old_split_at: ahash::HashSet<HashBytes>,
new_split_at: ahash::HashSet<HashBytes>,
) -> Result<MerkleUpdate> {
let labels = [("workchain", shard_id.workchain().to_string())];
let histogram =
HistogramGuard::begin_with_labels("tycho_collator_create_merkle_update_time_high", &labels);
let merkle_update_builder =
MerkleUpdate::create(old_state_root.as_ref(), new_state_root.as_ref(), usage_tree);
let state_update = merkle_update_builder.par_build(old_split_at, new_split_at)?;
let elapsed = histogram.finish();
tracing::debug!(
target: tracing_targets::COLLATOR,
elapsed = %humantime::format_duration(elapsed),
"merkle update created"
);
Ok(state_update)
}