use moka::sync::Cache;
use std::collections::HashSet;
use std::sync::OnceLock;
use std::time::Duration;
use crate::constants::{ALL_SHARD_ID, METACHAIN_SHARD_ID};
static TOPIC_CACHE: OnceLock<Cache<String, TopicInfo>> = OnceLock::new();
fn get_topic_cache() -> &'static Cache<String, TopicInfo> {
TOPIC_CACHE.get_or_init(|| {
Cache::builder()
.max_capacity(10_000)
.time_to_live(Duration::from_secs(3600))
.build()
})
}
#[cfg(test)]
pub fn get_cache_len() -> u64 {
get_topic_cache().entry_count()
}
pub const TRANSACTIONS_BASE_TOPIC: &str = "transactions";
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum BaseTopic {
Transactions,
UnsignedTransactions,
RewardsTransactions,
ShardBlocks,
MiniBlocks,
PeerChangeBlockBodies,
MetachainBlocks,
AccountTrieNodes,
ValidatorTrieNodes,
Consensus,
HeartbeatV2,
PeerAuthentication,
Connection,
ValidatorInfo,
EquivalentProofs,
}
impl BaseTopic {
pub const fn iter() -> &'static [Self] {
&[
Self::Transactions,
Self::UnsignedTransactions,
Self::RewardsTransactions,
Self::ShardBlocks,
Self::MiniBlocks,
Self::PeerChangeBlockBodies,
Self::MetachainBlocks,
Self::AccountTrieNodes,
Self::ValidatorTrieNodes,
Self::Consensus,
Self::HeartbeatV2,
Self::PeerAuthentication,
Self::Connection,
Self::ValidatorInfo,
Self::EquivalentProofs,
]
}
pub const fn base_name(self) -> &'static str {
match self {
Self::Transactions => "transactions",
Self::UnsignedTransactions => "unsignedTransactions",
Self::RewardsTransactions => "rewardsTransactions",
Self::ShardBlocks => "shardBlocks",
Self::MiniBlocks => "txBlockBodies",
Self::PeerChangeBlockBodies => "peerChangeBlockBodies",
Self::MetachainBlocks => "metachainBlocks",
Self::AccountTrieNodes => "accountTrieNodes",
Self::ValidatorTrieNodes => "validatorTrieNodes",
Self::Consensus => "consensus",
Self::HeartbeatV2 => "heartbeatV2",
Self::PeerAuthentication => "peerAuthentication",
Self::Connection => "connection",
Self::ValidatorInfo => "validatorInfo",
Self::EquivalentProofs => "equivalentProofs",
}
}
pub fn from_name(name: &str) -> Option<Self> {
Self::iter()
.iter()
.copied()
.find(|base| base.base_name() == name)
}
pub fn classify_topic(topic: &str) -> Option<(Self, &str)> {
for base in Self::iter() {
let base_name = base.base_name();
if topic == base_name {
return Some((*base, ""));
}
if let Some(suffix) = topic.strip_prefix(base_name).filter(|s| s.starts_with('_')) {
return Some((*base, suffix));
}
}
None
}
fn uses_shard_identifiers(self) -> bool {
!matches!(
self,
Self::MetachainBlocks
| Self::PeerAuthentication
| Self::Connection
| Self::ValidatorInfo
)
}
pub fn generate_topics(
self,
shards: &[u32],
include_metachain: bool,
include_all: bool,
) -> Vec<String> {
let mut topics = Vec::new();
topics.push(self.base_name().to_owned());
if self.uses_shard_identifiers() {
topics.extend(generate_pair_topics(
self.base_name(),
shards,
include_metachain,
include_all,
));
}
topics
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopicInfo {
pub base: BaseTopic,
pub routing: TopicRouting,
}
impl TopicInfo {
pub fn parse(topic: &str) -> Option<Self> {
let (base, suffix) = BaseTopic::classify_topic(topic)?;
let routing = parse_routing_suffix(suffix)?;
Some(Self { base, routing })
}
pub fn parse_cached(topic: &str) -> Option<Self> {
let cache = get_topic_cache();
if let Some(cached) = cache.get(topic) {
return Some(cached);
}
let parsed = Self::parse(topic);
if let Some(info) = &parsed {
cache.insert(topic.to_owned(), info.clone());
}
parsed
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TopicRouting {
Global,
Target(Vec<TopicShard>),
}
impl TopicRouting {
pub fn is_global(&self) -> bool {
matches!(self, Self::Global)
}
pub fn shards(&self) -> &[TopicShard] {
match self {
Self::Global => &[],
Self::Target(shards) => shards.as_slice(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TopicShard {
Shard(u32),
Metachain,
All,
}
impl TopicShard {
fn from_token(token: &str) -> Option<Self> {
if token.eq_ignore_ascii_case("META") {
return Some(Self::Metachain);
}
if token.eq_ignore_ascii_case("ALL") {
return Some(Self::All);
}
token.parse::<u32>().ok().map(|v| match v {
METACHAIN_SHARD_ID => TopicShard::Metachain,
ALL_SHARD_ID => TopicShard::All,
_ => TopicShard::Shard(v),
})
}
pub fn as_u32(&self) -> Option<u32> {
match self {
Self::Shard(value) => Some(*value),
_ => None,
}
}
}
fn parse_routing_suffix(suffix: &str) -> Option<TopicRouting> {
if suffix.is_empty() {
return Some(TopicRouting::Global);
}
let trimmed = suffix.strip_prefix('_')?;
if trimmed.is_empty() {
return Some(TopicRouting::Global);
}
let mut shards = Vec::new();
for token in trimmed.split('_') {
let shard = TopicShard::from_token(token)?;
shards.push(shard);
}
Some(TopicRouting::Target(shards))
}
fn shard_id_to_string(shard_id: u32) -> String {
match shard_id {
METACHAIN_SHARD_ID => "_META".to_owned(),
ALL_SHARD_ID => "_ALL".to_owned(),
_ => format!("_{shard_id}"),
}
}
pub fn communication_identifier_between_shards(shard_id1: u32, shard_id2: u32) -> String {
if shard_id1 == ALL_SHARD_ID || shard_id2 == ALL_SHARD_ID {
return shard_id_to_string(ALL_SHARD_ID);
}
if shard_id1 == shard_id2 {
return shard_id_to_string(shard_id1);
}
if shard_id1 < shard_id2 {
return format!(
"{}{}",
shard_id_to_string(shard_id1),
shard_id_to_string(shard_id2)
);
}
format!(
"{}{}",
shard_id_to_string(shard_id2),
shard_id_to_string(shard_id1)
)
}
pub fn broadcast_topic(base: &str, shard_id1: u32, shard_id2: u32) -> String {
format!(
"{base}{}",
communication_identifier_between_shards(shard_id1, shard_id2)
)
}
pub fn transaction_topics_from_shards(
shards: &[u32],
include_metachain: bool,
include_all: bool,
) -> Vec<String> {
let mut topics = generate_pair_topics(
TRANSACTIONS_BASE_TOPIC,
shards,
include_metachain,
include_all,
);
topics.sort();
topics
}
pub fn all_topics_for_shards(
shards: &[u32],
include_metachain: bool,
include_all: bool,
) -> Vec<String> {
let mut seen = HashSet::new();
let mut topics = Vec::new();
for base in BaseTopic::iter() {
for topic in base.generate_topics(shards, include_metachain, include_all) {
if seen.insert(topic.clone()) {
topics.push(topic);
}
}
}
topics.sort();
topics
}
fn generate_pair_topics(
base: &str,
shards: &[u32],
include_metachain: bool,
include_all: bool,
) -> Vec<String> {
let mut shard_ids: Vec<u32> = shards.to_vec();
if include_metachain {
shard_ids.push(METACHAIN_SHARD_ID);
}
shard_ids.sort_unstable();
shard_ids.dedup();
let mut seen = HashSet::new();
let mut topics = Vec::new();
for (idx, shard_a) in shard_ids.iter().enumerate() {
for shard_b in &shard_ids[idx..] {
let topic = broadcast_topic(base, *shard_a, *shard_b);
if seen.insert(topic.clone()) {
topics.push(topic);
}
}
}
if include_all {
let topic = broadcast_topic(base, ALL_SHARD_ID, ALL_SHARD_ID);
if seen.insert(topic.clone()) {
topics.push(topic);
}
}
topics
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn communication_identifier_matches_go_logic() {
assert_eq!(
communication_identifier_between_shards(0, 1),
"_0_1".to_owned()
);
assert_eq!(
communication_identifier_between_shards(2, 0),
"_0_2".to_owned()
);
assert_eq!(
communication_identifier_between_shards(METACHAIN_SHARD_ID, 1),
"_1_META".to_owned()
);
assert_eq!(
communication_identifier_between_shards(ALL_SHARD_ID, 0),
"_ALL".to_owned()
);
assert_eq!(
communication_identifier_between_shards(2, 2),
"_2".to_owned()
);
}
#[test]
fn transaction_topics_cover_all_pairs() {
let topics = transaction_topics_from_shards(&[0, 1, 2], true, true);
assert!(topics.contains(&"transactions_0".to_owned()));
assert!(topics.contains(&"transactions_0_1".to_owned()));
assert!(topics.contains(&"transactions_1_2".to_owned()));
assert!(topics.contains(&"transactions_0_META".to_owned()));
assert!(topics.contains(&"transactions_META".to_owned()));
assert!(topics.contains(&"transactions_ALL".to_owned()));
}
#[test]
fn all_topics_include_control_channels() {
let topics = all_topics_for_shards(&[0, 1], true, true);
assert!(topics.contains(&"connection".to_owned()));
assert!(topics.contains(&"peerAuthentication".to_owned()));
assert!(topics.iter().any(|t| t.starts_with("consensus_")));
assert!(topics.iter().any(|t| t.starts_with("transactions_")));
assert!(topics.contains(&"metachainBlocks".to_owned()));
}
#[test]
fn topic_info_parses_cross_shard_transactions() {
let info = TopicInfo::parse("transactions_0_2").expect("parse");
assert_eq!(info.base, BaseTopic::Transactions);
assert!(
matches!(&info.routing, TopicRouting::Target(shards) if shards.len() == 2
&& matches!(shards[0], TopicShard::Shard(0))
&& matches!(shards[1], TopicShard::Shard(2))),
"unexpected routing: {:?}",
info.routing
);
}
#[test]
fn topic_info_parses_meta_route() {
let info = TopicInfo::parse("transactions_META").expect("parse");
assert_eq!(info.base, BaseTopic::Transactions);
assert!(
matches!(&info.routing, TopicRouting::Target(shards) if shards.len() == 1
&& matches!(shards[0], TopicShard::Metachain)),
"unexpected routing: {:?}",
info.routing
);
}
#[test]
fn topic_info_parses_global_topic() {
let info = TopicInfo::parse("validatorInfo").expect("parse");
assert_eq!(info.base, BaseTopic::ValidatorInfo);
assert!(info.routing.is_global());
}
#[test]
fn topic_shard_from_numeric_metachain() {
let info = TopicInfo::parse("transactions_4294967295").expect("parse");
assert_eq!(info.base, BaseTopic::Transactions);
assert!(
matches!(
&info.routing,
TopicRouting::Target(shards) if shards.len() == 1
&& matches!(shards[0], TopicShard::Metachain)
),
"expected Metachain, got {:?}",
info.routing
);
}
#[test]
fn topic_shard_from_numeric_all() {
let info = TopicInfo::parse("transactions_4294967280").expect("parse");
assert_eq!(info.base, BaseTopic::Transactions);
assert!(
matches!(
&info.routing,
TopicRouting::Target(shards) if shards.len() == 1
&& matches!(shards[0], TopicShard::All)
),
"expected All, got {:?}",
info.routing
);
}
#[test]
fn cache_bounds_memory_usage() {
let initial_len = get_cache_len();
for i in 0..1000 {
let invalid_topic = format!("invalid_topic_{}", i);
let result = TopicInfo::parse_cached(&invalid_topic);
assert!(result.is_none());
}
assert_eq!(
get_cache_len(),
initial_len,
"Invalid topics should not be cached"
);
for i in 0..100 {
let valid_topic = format!("transactions_{}_META", i);
let result = TopicInfo::parse_cached(&valid_topic);
assert!(result.is_some());
}
let len_after_valid = get_cache_len();
assert!(
len_after_valid > initial_len,
"Valid topics should be cached"
);
assert!(
len_after_valid <= initial_len + 100,
"Cache size shouldn't exceed input count"
);
for i in 0..100 {
let valid_topic = format!("transactions_{}_META", i);
let _ = TopicInfo::parse_cached(&valid_topic);
}
let len_final = get_cache_len();
assert!(
len_final <= initial_len + 100,
"Cache grew unbounded on re-access: {} -> {}",
len_after_valid,
len_final
);
}
}