use std::collections::{BTreeMap, VecDeque};
use anyhow::{Context, Result};
use tycho_block_util::queue::{QueuePartitionIdx, get_short_addr_string, get_short_hash_string};
use tycho_types::cell::HashBytes;
use tycho_types::models::{IntAddr, ShardIdent};
use tycho_util::FastHashSet;
use super::{
GetNextMessageGroupMode, InternalsPartitionReader, MessagesReaderMetrics,
MessagesReaderMetricsByPartitions,
};
use crate::collator::anchors_cache::AnchorsCacheTransaction;
use crate::collator::messages_buffer::{
BufferFillStateByCount, BufferFillStateBySlots, FillMessageGroupResult, IncludeAllMessages,
MessageGroup, MessagesBufferLimits, MsgFilter, SkipExpiredExternals,
};
use crate::collator::messages_reader::internals_range_reader::InternalsRangeReaderKind;
use crate::collator::messages_reader::state::ext::partition_range_reader::ExternalsPartitionRangeReaderState;
use crate::collator::messages_reader::state::ext::partition_reader::ExternalsPartitionReaderState;
use crate::collator::messages_reader::state::ext::range_reader::{
DebugExternalsRangeReaderState, ExternalsRangeReaderState,
};
use crate::collator::messages_reader::state::ext::reader::ExternalsReaderState;
use crate::collator::messages_reader::state::ext::{ExternalKey, ExternalsReaderRange};
use crate::collator::messages_reader::state::int::DebugInternalsRangeReaderState;
use crate::collator::messages_reader::state::with_prev_map_and_current;
use crate::collator::types::{
MsgsExecutionParamsExtension, MsgsExecutionParamsStuff, ParsedMessage,
};
use crate::internal_queue::types::message::InternalMessageValue;
use crate::internal_queue::types::router::PartitionRouter;
use crate::tracing_targets;
use crate::types::processed_upto::BlockSeqno;
use crate::types::{DebugIter, SaturatingAddAssign};
#[cfg(test)]
#[path = "../tests/externals_reader_tests.rs"]
pub(super) mod tests;
pub(super) struct ExternalsReader<'a, 'b> {
for_shard_id: ShardIdent,
block_seqno: BlockSeqno,
next_chain_time: u64,
msgs_exec_params: MsgsExecutionParamsStuff,
buffer_limits_by_partitions: BTreeMap<QueuePartitionIdx, MessagesBufferLimits>,
anchors_cache: &'a mut AnchorsCacheTransaction<'b>,
reader_state: &'a mut ExternalsReaderState,
all_ranges_fully_read: bool,
}
impl<'a, 'b> ExternalsReader<'a, 'b> {
pub fn new(
for_shard_id: ShardIdent,
block_seqno: BlockSeqno,
next_chain_time: u64,
msgs_exec_params: MsgsExecutionParamsStuff,
buffer_limits_by_partitions: BTreeMap<QueuePartitionIdx, MessagesBufferLimits>,
anchors_cache: &'a mut AnchorsCacheTransaction<'b>,
reader_state: &'a mut ExternalsReaderState,
) -> Self {
for par_id in buffer_limits_by_partitions.keys() {
reader_state.by_partitions.entry(*par_id).or_default();
}
for range_state in reader_state.ranges.values_mut() {
range_state.fully_read = range_state.range.current_position == range_state.range.to;
}
Self {
for_shard_id,
block_seqno,
next_chain_time,
msgs_exec_params,
buffer_limits_by_partitions,
anchors_cache,
reader_state,
all_ranges_fully_read: false,
}
}
pub(super) fn reset_read_state(&mut self) {
self.all_ranges_fully_read = false;
}
pub fn finalize(&mut self) -> Result<()> {
let mut max_processed_offsets = BTreeMap::<QueuePartitionIdx, u32>::new();
let seqnos: Vec<_> = self.reader_state.ranges.keys().copied().collect();
let mut seqnos_iter = seqnos.into_iter().peekable();
while let Some(seqno) = seqnos_iter.next() {
let is_last = seqnos_iter.peek().is_none();
let reader_state_by_partition = self.reader_state.by_partitions.clone();
let range_state = self.reader_state.ranges.get_mut(&seqno).unwrap();
for (par_id, par) in reader_state_by_partition.iter() {
let range_reader_state_by_partition =
range_state.get_state_by_partition_mut(*par_id)?;
let max_processed_offset = max_processed_offsets
.entry(*par_id)
.and_modify(|max| {
*max = range_reader_state_by_partition.processed_offset.max(*max);
})
.or_insert(*range_reader_state_by_partition.processed_offset);
if par.curr_processed_offset > *max_processed_offset && is_last {
*range_reader_state_by_partition.processed_offset = par.curr_processed_offset;
}
}
if range_state.range.current_position == range_state.range.to {
range_state.fully_read = true;
}
}
Ok(())
}
pub fn open_ranges_limit_reached(&self) -> bool {
self.reader_state.ranges.len() >= self.msgs_exec_params.current().open_ranges_limit()
}
pub fn reader_state(&self) -> &ExternalsReaderState {
self.reader_state
}
pub fn get_partition_ids(&self) -> Vec<QueuePartitionIdx> {
self.reader_state.by_partitions.keys().copied().collect()
}
pub fn drop_last_read_to_anchor_chain_time(&mut self) {
*self.reader_state.last_read_to_anchor_chain_time = None;
}
pub fn set_buffer_limits_by_partition(
&mut self,
buffer_limits_by_partitions: BTreeMap<QueuePartitionIdx, MessagesBufferLimits>,
) {
self.buffer_limits_by_partitions = buffer_limits_by_partitions;
}
pub fn has_non_zero_processed_offset(&self) -> bool {
self.reader_state.ranges.values().any(|r| {
r.by_partitions
.values()
.any(|par| *par.processed_offset > 0)
})
}
pub fn last_range_offset_reached(&self, par_id: &QueuePartitionIdx) -> bool {
self.reader_state
.by_partitions
.get(par_id)
.map(|state_by_partition| {
self.get_last_range_state().map(|(_, r)| {
r.by_partitions.get(par_id).map(|range_state_by_partition| {
*range_state_by_partition.processed_offset
<= state_by_partition.curr_processed_offset
})
})
})
.and_then(|res| res.ok())
.and_then(|res| res)
.unwrap_or(true)
}
pub fn last_range_offsets_reached_in_all_partitions(&self) -> bool {
self.get_last_range_state().map_or(true, |(_, r)| {
r.by_partitions.iter().all(|(par_id, par)| {
*par.processed_offset
<= self
.reader_state
.by_partitions
.get(par_id)
.unwrap()
.curr_processed_offset
})
})
}
pub fn count_messages_in_buffers_by_partitions(&self) -> BTreeMap<QueuePartitionIdx, usize> {
self.reader_state
.ranges
.values()
.fold(BTreeMap::new(), |mut curr, r| {
for (par_id, par) in r.by_partitions.iter() {
let sum = curr.entry(*par_id).or_default();
*sum = sum.saturating_add(par.buffer.msgs_count());
}
curr
})
}
pub fn has_messages_in_buffers(&self) -> bool {
self.reader_state.ranges.values().any(|v| {
v.by_partitions
.values()
.any(|par| par.buffer.msgs_count() > 0)
})
}
pub fn has_not_fully_read_ranges(&self) -> bool {
!self.all_ranges_fully_read
}
pub fn all_ranges_read_and_collected(&self) -> bool {
self.all_ranges_fully_read && !self.has_messages_in_buffers()
}
pub fn check_all_ranges_read_and_collected(&self) -> bool {
self.reader_state.ranges.values().all(|v| v.fully_read) && !self.has_messages_in_buffers()
}
pub fn all_read_externals_collected(&self) -> bool {
self.reader_state.ranges.values().all(|r| {
r.by_partitions.iter().all(|(par_id, s)| {
s.buffer.msgs_count() == 0
&& self
.reader_state
.by_partitions
.get(par_id)
.map(|r_s| r_s.curr_processed_offset)
.unwrap_or_default()
>= *s.skip_offset
})
})
}
pub fn has_pending_externals(&self) -> bool {
self.anchors_cache.has_pending_externals()
}
pub fn get_last_range_state_offsets_by_partitions(&self) -> Vec<(QueuePartitionIdx, u32)> {
self.get_last_range_state()
.map(|(_, r)| {
r.by_partitions
.iter()
.map(|(par_id, par)| (*par_id, *par.processed_offset))
.collect::<Vec<_>>()
})
.unwrap_or_default()
}
pub fn retain_only_last_range_state(&mut self) -> Result<()> {
let last_seqno = *self.reader_state.ranges.last_key_value().context(
"externals reader should have at least one range state when retain_only_last_range_state() called",
)?.0;
if last_seqno < self.block_seqno {
self.all_ranges_fully_read = false;
}
self.reader_state
.ranges
.retain(|&seqno, _| seqno == last_seqno);
Ok(())
}
pub fn set_from_to_current_position_in_last_range_state(&mut self) -> Result<()> {
let last_range_state = self.get_last_range_state_mut()?;
last_range_state.range.from = last_range_state.range.current_position;
Ok(())
}
pub fn get_last_range_state(&self) -> Result<(&BlockSeqno, &ExternalsRangeReaderState)> {
self.reader_state
.ranges
.last_key_value()
.context("externals reader should have at least one range reader")
}
pub fn get_last_range_state_mut(&mut self) -> Result<&mut ExternalsRangeReaderState> {
let (&last_seqno, _) = self.get_last_range_state()?;
Ok(self.reader_state.ranges.get_mut(&last_seqno).unwrap())
}
pub fn increment_curr_processed_offset(&mut self, par_id: &QueuePartitionIdx) -> Result<()> {
let reader_state_by_partition = self
.reader_state
.by_partitions
.get_mut(par_id)
.with_context(|| format!("externals reader state not exists for partition {par_id}"))?;
reader_state_by_partition.curr_processed_offset += 1;
Ok(())
}
pub fn drop_processing_offset(
&mut self,
par_id: QueuePartitionIdx,
drop_skip_offset: bool,
) -> Result<()> {
let reader_state_by_partition = self.get_state_by_partition_mut(par_id)?;
reader_state_by_partition.curr_processed_offset = 0;
let last_range_state = self.get_last_range_state_mut()?;
let last_range_state_by_partition = last_range_state.get_state_by_partition_mut(par_id)?;
*last_range_state_by_partition.processed_offset = 0;
if drop_skip_offset {
*last_range_state_by_partition.skip_offset = 0;
}
Ok(())
}
pub fn set_skip_processed_offset_to_current(
&mut self,
par_id: QueuePartitionIdx,
) -> Result<()> {
let curr_processed_offset = self.get_state_by_partition(par_id)?.curr_processed_offset;
let last_range_state = self.get_last_range_state_mut()?;
let last_range_state_by_partition = last_range_state.get_state_by_partition_mut(par_id)?;
if curr_processed_offset > *last_range_state_by_partition.processed_offset {
*last_range_state_by_partition.processed_offset = curr_processed_offset;
*last_range_state_by_partition.skip_offset = curr_processed_offset;
}
Ok(())
}
pub fn set_processed_to_current_position(&mut self, par_id: QueuePartitionIdx) -> Result<()> {
let (_, last_range_state) = self.get_last_range_state()?;
let current_position = last_range_state.range.current_position;
let reader_state_by_partition = self.get_state_by_partition_mut(par_id)?;
reader_state_by_partition.processed_to = current_position;
Ok(())
}
fn create_append_next_range_state(&mut self) {
let range_state = self.create_next_externals_range_state();
if self
.reader_state
.ranges
.insert(self.block_seqno, range_state)
{
panic!(
"externals range state should not already exist (for_shard_id: {}, seqno: {})",
self.for_shard_id, self.block_seqno
)
};
self.all_ranges_fully_read = false;
}
#[tracing::instrument(skip_all)]
fn create_next_externals_range_state(&self) -> ExternalsRangeReaderState {
let mut by_partitions = BTreeMap::new();
for (par_id, par) in self.reader_state.by_partitions.iter() {
by_partitions.insert(
*par_id,
ExternalsPartitionRangeReaderState::new(
Default::default(),
par.curr_processed_offset,
par.curr_processed_offset,
None,
),
);
}
let from = self
.reader_state
.ranges
.values()
.last()
.map(|r| r.range.to)
.unwrap_or_default();
let reader_state = ExternalsRangeReaderState::new(
ExternalsReaderRange {
from,
to: from,
current_position: from,
chain_time: self.next_chain_time,
},
by_partitions,
);
tracing::debug!(target: tracing_targets::COLLATOR,
seqno = self.block_seqno,
fully_read = reader_state.fully_read,
reader_state = ?DebugExternalsRangeReaderState(&reader_state),
"externals reader: created next range reader state",
);
reader_state
}
pub fn read_into_buffers(
&mut self,
read_mode: GetNextMessageGroupMode,
partition_router: &PartitionRouter,
) -> Result<MessagesReaderMetricsByPartitions> {
let mut metrics_by_partitions = MessagesReaderMetricsByPartitions::default();
if self.all_ranges_fully_read {
tracing::trace!(target: tracing_targets::COLLATOR,
"externals reader: all_ranges_fully_read=true",
);
return Ok(metrics_by_partitions);
}
let processed_to_by_partitions: BTreeMap<_, _> = self
.reader_state
.by_partitions
.iter()
.map(|(par_id, par)| (*par_id, par.processed_to))
.collect();
let mut ranges_seqno: VecDeque<_> = self.reader_state.ranges.keys().copied().collect();
let mut last_seqno = 0;
let mut last_ext_read_res_opt = None;
let externals_expire_timeout =
self.msgs_exec_params.current().externals_expire_timeout as u64;
'main_loop: loop {
let mut all_ranges_fully_read = true;
while let Some(seqno) = ranges_seqno.pop_front() {
{
let reader_state = self.reader_state.ranges.get(&seqno).unwrap_or_else(||
panic!(
"externals range reader state should exists (for_shard_id: {}, seqno: {}, block_seqno: {})",
self.for_shard_id, seqno, self.block_seqno,
)
);
tracing::trace!(target: tracing_targets::COLLATOR,
seqno,
fully_read = reader_state.fully_read,
range_reader_state = ?DebugExternalsRangeReaderState(reader_state),
"externals reader: try to read externals from range,"
);
last_seqno = seqno;
if reader_state.fully_read {
continue;
}
if read_mode == GetNextMessageGroupMode::Refill && seqno == self.block_seqno {
all_ranges_fully_read = false;
continue;
}
if self.reader_state.by_partitions.iter().all(|(par_id, s)| {
s.curr_processed_offset
< reader_state
.by_partitions
.get(par_id)
.map(|s| *s.skip_offset)
.unwrap_or_default()
}) {
tracing::trace!(target: tracing_targets::COLLATOR,
externals_reader_state = ?DebugIter(self.reader_state.by_partitions.iter()),
"externals reader: skip offset not reached in all partitions",
);
all_ranges_fully_read = false;
continue;
}
}
let reader_state = self.reader_state.ranges.get_mut(&seqno).unwrap();
let read_mode =
if seqno == self.block_seqno && read_mode != GetNextMessageGroupMode::Refill {
ReadNextExternalsMode::ToTheEnd
} else {
ReadNextExternalsMode::ToPreviuosReadTo
};
let mut read_res = read_externals_into_buffers(
&self.for_shard_id,
&seqno,
self.anchors_cache,
reader_state,
&self.buffer_limits_by_partitions,
read_mode,
partition_router,
&processed_to_by_partitions,
externals_expire_timeout,
)?;
metrics_by_partitions.append(std::mem::take(&mut read_res.metrics_by_partitions));
if !reader_state.fully_read {
all_ranges_fully_read = false;
} else if seqno == self.block_seqno {
if let Some(anchor_info) = self.anchors_cache.last_imported_anchor_info() {
reader_state.range.current_position = ExternalKey {
anchor_id: anchor_info.id,
msgs_offset: anchor_info.all_exts_count as u64,
};
reader_state.range.to = reader_state.range.current_position;
}
}
let max_fill_state_by_slots = read_res.max_fill_state_by_slots;
let max_fill_state_by_count = read_res.max_fill_state_by_count;
last_ext_read_res_opt = Some(read_res);
if max_fill_state_by_slots == BufferFillStateBySlots::CanFill {
break 'main_loop;
}
if max_fill_state_by_count == BufferFillStateByCount::IsFull
&& !reader_state.fully_read
{
break 'main_loop;
}
}
if all_ranges_fully_read {
if last_seqno < self.block_seqno {
if !self.open_ranges_limit_reached() {
if read_mode == GetNextMessageGroupMode::Continue {
if self.msgs_exec_params.new_is_some() {
tracing::debug!(target: tracing_targets::COLLATOR,
last_seqno,
"externals reader: do not create next range reader on Continue because new message exec params exists",
);
self.all_ranges_fully_read = true;
break;
}
self.create_append_next_range_state();
ranges_seqno.push_back(self.block_seqno);
} else {
tracing::debug!(target: tracing_targets::COLLATOR,
last_seqno,
"externals reader: do not create next range reader on Refill",
);
self.all_ranges_fully_read = true;
break;
}
} else {
tracing::debug!(target: tracing_targets::COLLATOR,
last_seqno,
open_ranges_limit = self.msgs_exec_params.current().open_ranges_limit,
"externals reader: open ranges limit reached",
);
self.all_ranges_fully_read = true;
break;
}
} else {
self.all_ranges_fully_read = true;
break;
}
} else {
break;
}
}
if let Some(read_res) = last_ext_read_res_opt {
if let Some(ct) = read_res.last_read_to_anchor_chain_time {
*self.reader_state.last_read_to_anchor_chain_time = Some(ct);
}
}
Ok(metrics_by_partitions)
}
pub fn collect_messages<V: InternalMessageValue>(
&mut self,
par_id: QueuePartitionIdx,
msg_group: &mut MessageGroup,
curr_partition_reader: Option<&InternalsPartitionReader<'_, V>>,
prev_partitions_readers: &BTreeMap<QueuePartitionIdx, InternalsPartitionReader<'_, V>>,
prev_msg_groups: &BTreeMap<QueuePartitionIdx, MessageGroup>,
already_skipped_accounts: &mut FastHashSet<HashBytes>,
) -> Result<CollectExternalsResult> {
let mut res = CollectExternalsResult::default();
let for_shard_id = self.for_shard_id;
let buffer_limits =
get_buffer_limits_by_partition(&self.buffer_limits_by_partitions, &par_id)?;
let curr_processed_offset = self.get_state_by_partition(par_id)?.curr_processed_offset;
let mut next_chain_time = 0;
for state in self.reader_state.ranges.values() {
let range_state_by_partition = state.get_state_by_partition(par_id)?;
if curr_processed_offset > *range_state_by_partition.skip_offset {
next_chain_time = state.range.chain_time;
}
}
if next_chain_time == 0 {
next_chain_time = self.next_chain_time;
}
anyhow::ensure!(next_chain_time > 0);
let externals_expire_timeout_ms =
self.msgs_exec_params.current().externals_expire_timeout as u64 * 1000;
let mut expired_msgs_count = 0;
let seqnos: Vec<BlockSeqno> = self.reader_state.ranges.keys().cloned().collect();
for &seqno in &seqnos {
let should_break = with_prev_map_and_current(
self.reader_state.ranges.inner_mut(),
seqno,
|prev_map, current_state: &mut ExternalsRangeReaderState| {
let range_state_by_partition =
current_state.get_state_by_partition_mut(par_id)?;
if curr_processed_offset > *range_state_by_partition.skip_offset {
let mut msg_filter = MsgFilter::IncludeAll(IncludeAllMessages);
if matches!(*range_state_by_partition.last_expire_check_on_ct, Some(last) if next_chain_time > last)
|| range_state_by_partition.last_expire_check_on_ct.is_none()
{
*range_state_by_partition.last_expire_check_on_ct =
Some(next_chain_time);
let chain_time_threshold_ms =
next_chain_time.saturating_sub(externals_expire_timeout_ms);
if range_state_by_partition.buffer.min_ext_chain_time()
< chain_time_threshold_ms
{
msg_filter =
MsgFilter::SkipExpiredExternals(SkipExpiredExternals {
chain_time_threshold_ms,
total_skipped: &mut expired_msgs_count,
});
}
}
res.metrics.add_to_message_groups_timer.start();
let FillMessageGroupResult {
ops_count,
collected_count,
..
} = range_state_by_partition.buffer.fill_message_group(
msg_group,
buffer_limits.slots_count,
buffer_limits.slot_vert_size,
already_skipped_accounts,
|account_id| {
should_skip_external_account(
par_id,
account_id,
prev_msg_groups,
prev_partitions_readers,
curr_partition_reader,
curr_processed_offset,
for_shard_id,
prev_map,
)
},
msg_filter,
);
res.metrics
.add_to_msgs_groups_ops_count
.saturating_add_assign(ops_count);
res.metrics.add_to_message_groups_timer.stop();
res.collected_count.saturating_add_assign(collected_count);
}
let range_processed_offset = *range_state_by_partition.processed_offset;
Ok::<_, anyhow::Error>(curr_processed_offset <= range_processed_offset)
},
)?;
if should_break {
break;
}
}
res.metrics.expired_ext_msgs_count += expired_msgs_count;
tracing::debug!(target: tracing_targets::COLLATOR,
partition_id = %par_id,
ext_curr_processed_offset = curr_processed_offset,
ext_msgs_collected_count = res.collected_count,
expired_ext_msgs_count = res.metrics.expired_ext_msgs_count,
"external messages collected",
);
let labels = [("workchain", self.for_shard_id.workchain().to_string())];
metrics::counter!("tycho_do_collate_ext_msgs_expired_count", &labels)
.increment(expired_msgs_count);
Ok(res)
}
pub fn get_state_by_partition_mut<T: Into<QueuePartitionIdx>>(
&mut self,
par_id: T,
) -> anyhow::Result<&mut ExternalsPartitionReaderState> {
let par_id = par_id.into();
self.reader_state
.by_partitions
.get_mut(&par_id)
.with_context(|| format!("externals reader state not exists for partition2 {par_id}"))
}
pub fn get_state_by_partition<T: Into<QueuePartitionIdx>>(
&self,
par_id: T,
) -> anyhow::Result<&ExternalsPartitionReaderState> {
let par_id = par_id.into();
self.reader_state
.by_partitions
.get(&par_id)
.with_context(|| format!("externals reader state not exists for partition3 {par_id}"))
}
}
#[derive(Default)]
pub(super) struct CollectExternalsResult {
pub metrics: MessagesReaderMetrics,
pub collected_count: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadNextExternalsMode {
ToTheEnd,
ToPreviuosReadTo,
}
#[derive(Default)]
struct ReadExternalsRangeResult {
pub last_read_to_anchor_chain_time: Option<u64>,
pub max_fill_state_by_count: BufferFillStateByCount,
pub max_fill_state_by_slots: BufferFillStateBySlots,
pub metrics_by_partitions: MessagesReaderMetricsByPartitions,
}
#[allow(clippy::too_many_arguments)]
fn should_skip_external_account<V: InternalMessageValue>(
par_id: QueuePartitionIdx,
account_id: &HashBytes,
prev_msg_groups: &BTreeMap<QueuePartitionIdx, MessageGroup>,
prev_partitions_readers: &BTreeMap<QueuePartitionIdx, InternalsPartitionReader<'_, V>>,
curr_partition_reader: Option<&InternalsPartitionReader<'_, V>>,
curr_processed_offset: u32,
for_shard_id: ShardIdent,
prev_states: &BTreeMap<u32, ExternalsRangeReaderState>,
) -> (bool, u64) {
let mut check_ops_count = 0;
let dst_addr = IntAddr::from((for_shard_id.workchain() as i8, *account_id));
for msg_group in prev_msg_groups.values() {
if msg_group.messages_count() > 0 {
check_ops_count.saturating_add_assign(1);
if msg_group.contains_account(account_id) {
tracing::trace!(target: tracing_targets::COLLATOR,
partition_id = %par_id,
account_id = %get_short_hash_string(account_id),
"external messages skipped for account - msg_group of prev partition",
);
return (true, check_ops_count);
}
}
}
for prev_par_reader in prev_partitions_readers.values() {
for prev_par_reader_state in prev_par_reader.reader_state.ranges.values() {
if prev_par_reader_state.buffer.msgs_count() > 0 {
check_ops_count.saturating_add_assign(1);
if prev_par_reader_state
.buffer
.account_messages_count(account_id)
> 0
{
tracing::trace!(target: tracing_targets::COLLATOR,
partition_id = %par_id,
account_id = %get_short_hash_string(account_id),
"external messages skipped for account - prev partition range reader buffer",
);
return (true, check_ops_count);
}
}
}
check_ops_count.saturating_add_assign(1);
if let Some(remaning_msgs_stats) = &prev_par_reader.remaning_msgs_stats
&& remaning_msgs_stats.contains(&dst_addr)
{
tracing::trace!(target: tracing_targets::COLLATOR,
partition_id = %par_id,
account_id = %get_short_hash_string(account_id),
"external messages skipped for account - prev partition reader remaning stats",
);
return (true, check_ops_count);
}
}
for (_, state) in prev_states.iter() {
let buffer = &state.get_state_by_partition(par_id).unwrap().buffer;
if buffer.msgs_count() > 0 {
check_ops_count.saturating_add_assign(1);
if buffer.account_messages_count(account_id) > 0 {
tracing::trace!(target: tracing_targets::COLLATOR,
partition_id = %par_id,
account_id = %get_short_hash_string(account_id),
"external messages skipped for account - prev externals range reader buffer",
);
return (true, check_ops_count);
}
}
}
if let Some(curr_partition_reader) = curr_partition_reader {
for curr_par_range_reader in curr_partition_reader.range_readers().values() {
if matches!(
curr_par_range_reader.kind,
InternalsRangeReaderKind::NewMessages
) {
break;
}
let state = curr_partition_reader
.reader_state
.ranges
.get(&curr_par_range_reader.seqno)
.unwrap();
if curr_processed_offset <= *state.skip_offset {
break;
}
if state.buffer.msgs_count() > 0 {
check_ops_count.saturating_add_assign(1);
if state.buffer.account_messages_count(account_id) > 0 {
tracing::trace!(target: tracing_targets::COLLATOR,
partition_id = %par_id,
account_id = %get_short_hash_string(account_id),
rr_seqno = curr_par_range_reader.seqno,
rr_kind = ?curr_par_range_reader.kind,
reader_state = ?DebugInternalsRangeReaderState(state),
"external messages skipped for account - current partition range reader buffer",
);
return (true, check_ops_count);
}
}
check_ops_count.saturating_add_assign(1);
if state.contains_account_addr_in_remaning_msgs_stats(&dst_addr) {
tracing::trace!(target: tracing_targets::COLLATOR,
partition_id = %par_id,
account_id = %get_short_hash_string(account_id),
rr_seqno = curr_par_range_reader.seqno,
rr_kind = ?curr_par_range_reader.kind,
reader_state = ?DebugInternalsRangeReaderState(state),
remaming_msgs_stats = ?state
.remaning_msgs_stats.inner()
.map(|stats| DebugIter(stats.statistics().iter().map(|(addr, count)|
(get_short_addr_string(addr), count)
))),
"external messages skipped for account - current partition range reader remaining stats",
);
return (true, check_ops_count);
}
}
}
(false, check_ops_count)
}
#[tracing::instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
fn read_externals_into_buffers(
for_shard_id: &ShardIdent,
seqno: &BlockSeqno,
anchors_cache: &mut AnchorsCacheTransaction<'_>,
reader_state: &mut ExternalsRangeReaderState,
buffer_limits_by_partitions: &BTreeMap<QueuePartitionIdx, MessagesBufferLimits>,
read_mode: ReadNextExternalsMode,
partition_router: &PartitionRouter,
processed_to_by_partitions: &BTreeMap<QueuePartitionIdx, ExternalKey>,
externals_expire_timeout: u64,
) -> Result<ReadExternalsRangeResult> {
let labels = [("workchain", for_shard_id.workchain().to_string())];
let next_chain_time = reader_state.range.chain_time;
tracing::info!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
next_chain_time,
?read_mode,
fully_read = reader_state.fully_read,
"read externals",
);
let mut metrics_by_partitions = MessagesReaderMetricsByPartitions::default();
metrics_by_partitions
.get_mut(QueuePartitionIdx::ZERO)
.read_ext_messages_timer
.start();
let was_read_to = reader_state.range.current_position;
let prev_to = reader_state.range.to;
let mut prev_to_reached = false;
let (mut max_fill_state_by_count, mut max_fill_state_by_slots) =
get_max_buffers_fill_state(buffer_limits_by_partitions, reader_state)?;
let mut has_filled_buffer = matches!(
(&max_fill_state_by_count, &max_fill_state_by_slots),
(BufferFillStateByCount::IsFull, _) | (_, BufferFillStateBySlots::CanFill)
);
let mut last_read_to_anchor_chain_time = None;
let mut msgs_read_offset_in_last_anchor;
let mut has_pending_externals_in_last_read_anchor = false;
let mut total_msgs_imported = 0;
let mut count_expired_anchors = 0_u32;
let mut count_expired_messages = 0_u64;
let next_idx = 0;
loop {
let next_entry = anchors_cache.get(next_idx);
let (anchor_id, anchor) = match next_entry {
Some(entry) => entry,
None => {
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"no next entry in anchors cache",
);
reader_state.fully_read = true;
break;
}
};
if anchor_id < was_read_to.anchor_id {
assert_eq!(next_idx, 0);
anchors_cache.pop_front();
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
anchor_id,
"anchor already read, removed from anchors cache",
);
continue;
}
last_read_to_anchor_chain_time = Some(anchor.chain_time);
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
last_read_anchor_id = anchor_id,
last_read_anchor_chain_time = anchor.chain_time,
);
if anchor_id == was_read_to.anchor_id {
msgs_read_offset_in_last_anchor = was_read_to.msgs_offset;
} else {
msgs_read_offset_in_last_anchor = 0;
}
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
anchor_id,
msgs_read_offset_in_last_anchor,
"externals count: {}", anchor.externals.len(),
);
prev_to_reached = anchor_id > prev_to.anchor_id
|| (anchor_id == prev_to.anchor_id
&& msgs_read_offset_in_last_anchor == prev_to.msgs_offset);
let externals_expire_timeout_ms = externals_expire_timeout * 1000;
if next_chain_time.saturating_sub(anchor.chain_time) > externals_expire_timeout_ms {
let iter = anchor.iter_externals(msgs_read_offset_in_last_anchor as usize);
let mut expired_msgs_count = 0;
for ext_msg in iter {
if for_shard_id.contains_address(&ext_msg.info.dst) {
tracing::trace!(target: tracing_targets::COLLATOR,
anchor_id,
anchor_chain_time = anchor.chain_time,
next_chain_time,
"ext_msg hash: {}, dst: {} is expired by timeout {} ms",
ext_msg.hash(), ext_msg.info.dst, externals_expire_timeout_ms,
);
expired_msgs_count += 1;
let target_partition = partition_router.get_partition(None, &ext_msg.info.dst);
let par_metrics = metrics_by_partitions.get_mut(target_partition);
par_metrics.expired_ext_msgs_count += 1;
}
}
assert_eq!(next_idx, 0);
anchors_cache.pop_front();
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
anchor_id,
anchor_chain_time = anchor.chain_time,
next_chain_time,
expired_msgs_count,
"anchor fully skipped due to expiration, removed from anchors cache",
);
count_expired_anchors = count_expired_anchors.saturating_add(1);
count_expired_messages = count_expired_messages.saturating_add(expired_msgs_count);
let curr_ext_key = ExternalKey {
anchor_id,
msgs_offset: msgs_read_offset_in_last_anchor,
};
reader_state.range.current_position = curr_ext_key;
if reader_state.range.current_position > reader_state.range.to {
reader_state.range.to = reader_state.range.current_position;
}
continue;
}
let mut msgs_imported_from_last_anchor = 0;
let iter = anchor.iter_externals(msgs_read_offset_in_last_anchor as usize);
for ext_msg in iter {
tracing::trace!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
anchor_id,
"read ext_msg dst: {}", ext_msg.info.dst,
);
if !(has_filled_buffer
|| read_mode == ReadNextExternalsMode::ToPreviuosReadTo && prev_to_reached)
{
msgs_read_offset_in_last_anchor += 1;
let curr_ext_key = ExternalKey {
anchor_id,
msgs_offset: msgs_read_offset_in_last_anchor,
};
reader_state.range.current_position = curr_ext_key;
if reader_state.range.current_position > reader_state.range.to {
reader_state.range.to = reader_state.range.current_position;
}
prev_to_reached = anchor_id > prev_to.anchor_id
|| (anchor_id == prev_to.anchor_id
&& msgs_read_offset_in_last_anchor == prev_to.msgs_offset);
if for_shard_id.contains_address(&ext_msg.info.dst) {
let target_partition = partition_router.get_partition(None, &ext_msg.info.dst);
let par_metrics = metrics_by_partitions.get_mut(target_partition);
par_metrics.add_to_message_groups_timer.start();
let processed_to = processed_to_by_partitions.get(&target_partition).unwrap();
if &curr_ext_key > processed_to {
let reader_state_by_partition = reader_state
.by_partitions
.get_mut(&target_partition)
.with_context(|| format!(
"target partition {} should exist in range reader state (seqno={})",
target_partition, seqno,
))?;
reader_state_by_partition
.buffer
.add_message(ParsedMessage::from_ext(
ext_msg.info.clone(),
ext_msg.cell.clone(),
true,
anchor.chain_time,
));
par_metrics
.add_to_msgs_groups_ops_count
.saturating_add_assign(1);
}
par_metrics.add_to_message_groups_timer.stop();
par_metrics.read_ext_msgs_count += 1;
total_msgs_imported += 1;
msgs_imported_from_last_anchor += 1;
tracing::trace!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
anchor_id,
"imported ext_msg dst: {}", ext_msg.info.dst,
);
(max_fill_state_by_count, max_fill_state_by_slots) =
get_max_buffers_fill_state(buffer_limits_by_partitions, reader_state)?;
has_filled_buffer = matches!(
(&max_fill_state_by_count, &max_fill_state_by_slots),
(BufferFillStateByCount::IsFull, _) | (_, BufferFillStateBySlots::CanFill)
);
}
}
else if for_shard_id.contains_address(&ext_msg.info.dst) {
has_pending_externals_in_last_read_anchor = true;
break;
}
}
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
anchor_id,
msgs_read_offset_in_last_anchor,
msgs_imported_from_last_anchor,
);
if anchor.externals.len() == msgs_read_offset_in_last_anchor as usize {
assert_eq!(next_idx, 0);
anchors_cache.pop_front();
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
anchor_id,
"anchor just fully read, removed from anchors cache",
);
}
if read_mode == ReadNextExternalsMode::ToPreviuosReadTo && prev_to_reached {
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"stopped reading externals when prev_to reached: ({}, {})",
prev_to.anchor_id, prev_to.msgs_offset,
);
reader_state.fully_read = true;
break;
}
if has_filled_buffer {
break;
}
}
if matches!(max_fill_state_by_slots, BufferFillStateBySlots::CanFill) {
tracing::debug!(target: tracing_targets::COLLATOR,
reader_state = ?DebugExternalsRangeReaderState(reader_state),
"externals reader: can fully fill all slots in message group",
);
} else if matches!(max_fill_state_by_count, BufferFillStateByCount::IsFull) {
tracing::debug!(target: tracing_targets::COLLATOR,
max_msgs_limits = ?DebugIter(buffer_limits_by_partitions.iter().map(|(par_id, limits)| (par_id, limits.max_count))),
reader_state = ?DebugExternalsRangeReaderState(reader_state),
"externals reader: messages buffers filled up to limits",
);
}
let has_pending_externals_in_range =
if read_mode == ReadNextExternalsMode::ToPreviuosReadTo && prev_to_reached {
false
} else if has_pending_externals_in_last_read_anchor {
true
} else if read_mode == ReadNextExternalsMode::ToPreviuosReadTo {
anchors_cache.check_has_pending_externals_in_range(&prev_to)
} else {
anchors_cache.has_pending_externals()
};
tracing::info!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
total_msgs_imported,
count_expired_messages,
has_pending_externals_in_last_read_anchor,
has_pending_externals_in_range,
?read_mode,
);
metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
.decrement((total_msgs_imported + count_expired_messages) as f64);
if count_expired_messages > 0 {
metrics::counter!("tycho_do_collate_ext_msgs_expired_count", &labels)
.increment(count_expired_messages);
}
{
let add_to_msgs_groups_total_elapsed =
metrics_by_partitions.add_to_message_groups_total_elapsed();
let par_0_metrics = metrics_by_partitions.get_mut(QueuePartitionIdx::ZERO);
par_0_metrics.read_ext_messages_timer.stop();
par_0_metrics.read_ext_messages_timer.total_elapsed -= add_to_msgs_groups_total_elapsed;
}
Ok(ReadExternalsRangeResult {
last_read_to_anchor_chain_time,
max_fill_state_by_count,
max_fill_state_by_slots,
metrics_by_partitions,
})
}
pub fn get_max_buffers_fill_state(
buffer_limits_by_partitions: &BTreeMap<QueuePartitionIdx, MessagesBufferLimits>,
reader_state: &ExternalsRangeReaderState,
) -> Result<(BufferFillStateByCount, BufferFillStateBySlots)> {
let mut fill_state_by_count = BufferFillStateByCount::NotFull;
let mut fill_state_by_slots = BufferFillStateBySlots::CanNotFill;
for (par_id, par) in reader_state.by_partitions.iter() {
let buffer_limits = get_buffer_limits_by_partition(buffer_limits_by_partitions, par_id)?;
let (par_fill_state_by_count, par_fill_state_by_slots) =
par.buffer.check_is_filled(&buffer_limits);
if par_fill_state_by_count == BufferFillStateByCount::IsFull {
fill_state_by_count = BufferFillStateByCount::IsFull;
}
if par_fill_state_by_slots == BufferFillStateBySlots::CanFill {
fill_state_by_slots = BufferFillStateBySlots::CanFill;
}
if matches!(
(&fill_state_by_count, &fill_state_by_slots),
(
BufferFillStateByCount::IsFull,
BufferFillStateBySlots::CanFill
)
) {
break;
}
}
Ok((fill_state_by_count, fill_state_by_slots))
}
fn get_buffer_limits_by_partition(
buffer_limits_by_partitions: &BTreeMap<QueuePartitionIdx, MessagesBufferLimits>,
partitions_id: &QueuePartitionIdx,
) -> Result<MessagesBufferLimits> {
buffer_limits_by_partitions
.get(partitions_id)
.with_context(|| {
format!(
"externals range reader does not contain buffer limits for partition {}",
partitions_id
)
})
.cloned()
}