use std::collections::BTreeMap;
use std::sync::Arc;
use anyhow::{Context, Result};
use tycho_types::models::{IntAddr, ShardIdent};
use tycho_util::transactional::option::TransactionalOption;
use tycho_util::transactional::value::TransactionalValue;
use tycho_util_proc::Transactional;
use crate::collator::messages_buffer::MessagesBuffer;
use crate::collator::messages_reader::state::ShardReaderState;
use crate::internal_queue::types::stats::QueueStatistics;
use crate::types::ProcessedTo;
use crate::types::processed_upto::InternalsRangeStuff;
#[derive(Transactional, Default)]
pub struct InternalsRangeReaderState {
pub buffer: MessagesBuffer,
pub msgs_stats: TransactionalValue<Option<Arc<QueueStatistics>>>,
pub remaning_msgs_stats: TransactionalOption<QueueStatistics>,
pub read_stats: TransactionalOption<QueueStatistics>,
pub shards: TransactionalValue<BTreeMap<ShardIdent, ShardReaderState>>,
pub skip_offset: TransactionalValue<u32>,
pub processed_offset: TransactionalValue<u32>,
}
impl InternalsRangeReaderState {
pub fn contains_account_addr_in_remaning_msgs_stats(&self, account_addr: &IntAddr) -> bool {
match &self.remaning_msgs_stats.inner() {
None => false,
Some(remaning_msgs_stats) => {
remaning_msgs_stats.statistics().contains_key(account_addr)
}
}
}
pub fn is_fully_read(&self) -> bool {
self.shards.values().all(|s| s.is_fully_read())
}
pub fn from_range_info(range_info: &InternalsRangeStuff, processed_to: &ProcessedTo) -> Self {
let mut res = Self {
skip_offset: range_info.skip_offset.into(),
processed_offset: range_info.processed_offset.into(),
..Default::default()
};
for (shard_id, shard_range_info) in &range_info.shards {
let shard_processed_to = processed_to.get(shard_id).copied().unwrap_or_default();
let reader_state =
ShardReaderState::from_range_info(shard_range_info, shard_processed_to);
res.shards.insert(*shard_id, reader_state);
}
res
}
#[inline]
pub fn get_read_stats_mut(&mut self) -> Result<&mut QueueStatistics> {
self.read_stats
.inner_mut()
.context("internals range reader state: read_stats is not loaded")
}
}