1use moka::sync::Cache;
2use std::collections::HashSet;
3use std::sync::OnceLock;
4use std::time::Duration;
5
6use crate::constants::{ALL_SHARD_ID, METACHAIN_SHARD_ID};
7
8static TOPIC_CACHE: OnceLock<Cache<String, TopicInfo>> = OnceLock::new();
15
16fn get_topic_cache() -> &'static Cache<String, TopicInfo> {
17 TOPIC_CACHE.get_or_init(|| {
18 Cache::builder()
19 .max_capacity(10_000)
21 .time_to_live(Duration::from_secs(3600))
23 .build()
24 })
25}
26
27#[cfg(test)]
29pub fn get_cache_len() -> u64 {
30 get_topic_cache().entry_count()
31}
32
33pub const TRANSACTIONS_BASE_TOPIC: &str = "transactions";
35
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
38pub enum BaseTopic {
39 Transactions,
41 UnsignedTransactions,
43 RewardsTransactions,
45 ShardBlocks,
47 MiniBlocks,
49 PeerChangeBlockBodies,
51 MetachainBlocks,
53 AccountTrieNodes,
55 ValidatorTrieNodes,
57 Consensus,
59 HeartbeatV2,
61 PeerAuthentication,
63 Connection,
65 ValidatorInfo,
67 EquivalentProofs,
69}
70
71impl BaseTopic {
72 pub const fn iter() -> &'static [Self] {
74 &[
75 Self::Transactions,
76 Self::UnsignedTransactions,
77 Self::RewardsTransactions,
78 Self::ShardBlocks,
79 Self::MiniBlocks,
80 Self::PeerChangeBlockBodies,
81 Self::MetachainBlocks,
82 Self::AccountTrieNodes,
83 Self::ValidatorTrieNodes,
84 Self::Consensus,
85 Self::HeartbeatV2,
86 Self::PeerAuthentication,
87 Self::Connection,
88 Self::ValidatorInfo,
89 Self::EquivalentProofs,
90 ]
91 }
92
93 pub const fn base_name(self) -> &'static str {
95 match self {
96 Self::Transactions => "transactions",
97 Self::UnsignedTransactions => "unsignedTransactions",
98 Self::RewardsTransactions => "rewardsTransactions",
99 Self::ShardBlocks => "shardBlocks",
100 Self::MiniBlocks => "txBlockBodies",
101 Self::PeerChangeBlockBodies => "peerChangeBlockBodies",
102 Self::MetachainBlocks => "metachainBlocks",
103 Self::AccountTrieNodes => "accountTrieNodes",
104 Self::ValidatorTrieNodes => "validatorTrieNodes",
105 Self::Consensus => "consensus",
106 Self::HeartbeatV2 => "heartbeatV2",
107 Self::PeerAuthentication => "peerAuthentication",
108 Self::Connection => "connection",
109 Self::ValidatorInfo => "validatorInfo",
110 Self::EquivalentProofs => "equivalentProofs",
111 }
112 }
113
114 pub fn from_name(name: &str) -> Option<Self> {
116 Self::iter()
117 .iter()
118 .copied()
119 .find(|base| base.base_name() == name)
120 }
121
122 pub fn classify_topic(topic: &str) -> Option<(Self, &str)> {
124 for base in Self::iter() {
125 let base_name = base.base_name();
126 if topic == base_name {
127 return Some((*base, ""));
128 }
129 if let Some(suffix) = topic.strip_prefix(base_name).filter(|s| s.starts_with('_')) {
130 return Some((*base, suffix));
131 }
132 }
133 None
134 }
135
136 fn uses_shard_identifiers(self) -> bool {
137 !matches!(
138 self,
139 Self::MetachainBlocks
140 | Self::PeerAuthentication
141 | Self::Connection
142 | Self::ValidatorInfo
143 )
144 }
145
146 pub fn generate_topics(
148 self,
149 shards: &[u32],
150 include_metachain: bool,
151 include_all: bool,
152 ) -> Vec<String> {
153 let mut topics = Vec::new();
154 topics.push(self.base_name().to_owned());
155
156 if self.uses_shard_identifiers() {
157 topics.extend(generate_pair_topics(
158 self.base_name(),
159 shards,
160 include_metachain,
161 include_all,
162 ));
163 }
164
165 topics
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct TopicInfo {
172 pub base: BaseTopic,
174 pub routing: TopicRouting,
176}
177
178impl TopicInfo {
179 pub fn parse(topic: &str) -> Option<Self> {
181 let (base, suffix) = BaseTopic::classify_topic(topic)?;
182 let routing = parse_routing_suffix(suffix)?;
183 Some(Self { base, routing })
184 }
185
186 pub fn parse_cached(topic: &str) -> Option<Self> {
191 let cache = get_topic_cache();
192
193 if let Some(cached) = cache.get(topic) {
195 return Some(cached);
196 }
197
198 let parsed = Self::parse(topic);
200
201 if let Some(info) = &parsed {
203 cache.insert(topic.to_owned(), info.clone());
204 }
205
206 parsed
207 }
208}
209
210#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum TopicRouting {
213 Global,
215 Target(Vec<TopicShard>),
217}
218
219impl TopicRouting {
220 pub fn is_global(&self) -> bool {
222 matches!(self, Self::Global)
223 }
224
225 pub fn shards(&self) -> &[TopicShard] {
227 match self {
228 Self::Global => &[],
229 Self::Target(shards) => shards.as_slice(),
230 }
231 }
232}
233
234#[derive(Debug, Clone, PartialEq, Eq)]
236pub enum TopicShard {
237 Shard(u32),
239 Metachain,
241 All,
243}
244
245impl TopicShard {
246 fn from_token(token: &str) -> Option<Self> {
247 if token.eq_ignore_ascii_case("META") {
248 return Some(Self::Metachain);
249 }
250 if token.eq_ignore_ascii_case("ALL") {
251 return Some(Self::All);
252 }
253 token.parse::<u32>().ok().map(|v| match v {
254 METACHAIN_SHARD_ID => TopicShard::Metachain,
255 ALL_SHARD_ID => TopicShard::All,
256 _ => TopicShard::Shard(v),
257 })
258 }
259
260 pub fn as_u32(&self) -> Option<u32> {
262 match self {
263 Self::Shard(value) => Some(*value),
264 _ => None,
265 }
266 }
267}
268
269fn parse_routing_suffix(suffix: &str) -> Option<TopicRouting> {
274 if suffix.is_empty() {
275 return Some(TopicRouting::Global);
276 }
277
278 let trimmed = suffix.strip_prefix('_')?;
279 if trimmed.is_empty() {
280 return Some(TopicRouting::Global);
281 }
282
283 let mut shards = Vec::new();
284 for token in trimmed.split('_') {
285 let shard = TopicShard::from_token(token)?;
286 shards.push(shard);
287 }
288
289 Some(TopicRouting::Target(shards))
290}
291
292fn shard_id_to_string(shard_id: u32) -> String {
296 match shard_id {
297 METACHAIN_SHARD_ID => "_META".to_owned(),
298 ALL_SHARD_ID => "_ALL".to_owned(),
299 _ => format!("_{shard_id}"),
300 }
301}
302
303pub fn communication_identifier_between_shards(shard_id1: u32, shard_id2: u32) -> String {
305 if shard_id1 == ALL_SHARD_ID || shard_id2 == ALL_SHARD_ID {
306 return shard_id_to_string(ALL_SHARD_ID);
307 }
308
309 if shard_id1 == shard_id2 {
310 return shard_id_to_string(shard_id1);
311 }
312
313 if shard_id1 < shard_id2 {
314 return format!(
315 "{}{}",
316 shard_id_to_string(shard_id1),
317 shard_id_to_string(shard_id2)
318 );
319 }
320
321 format!(
322 "{}{}",
323 shard_id_to_string(shard_id2),
324 shard_id_to_string(shard_id1)
325 )
326}
327
328pub fn broadcast_topic(base: &str, shard_id1: u32, shard_id2: u32) -> String {
331 format!(
332 "{base}{}",
333 communication_identifier_between_shards(shard_id1, shard_id2)
334 )
335}
336
337pub fn transaction_topics_from_shards(
339 shards: &[u32],
340 include_metachain: bool,
341 include_all: bool,
342) -> Vec<String> {
343 let mut topics = generate_pair_topics(
344 TRANSACTIONS_BASE_TOPIC,
345 shards,
346 include_metachain,
347 include_all,
348 );
349 topics.sort();
350 topics
351}
352
353pub fn all_topics_for_shards(
355 shards: &[u32],
356 include_metachain: bool,
357 include_all: bool,
358) -> Vec<String> {
359 let mut seen = HashSet::new();
360 let mut topics = Vec::new();
361
362 for base in BaseTopic::iter() {
363 for topic in base.generate_topics(shards, include_metachain, include_all) {
364 if seen.insert(topic.clone()) {
365 topics.push(topic);
366 }
367 }
368 }
369
370 topics.sort();
371 topics
372}
373
374fn generate_pair_topics(
379 base: &str,
380 shards: &[u32],
381 include_metachain: bool,
382 include_all: bool,
383) -> Vec<String> {
384 let mut shard_ids: Vec<u32> = shards.to_vec();
385 if include_metachain {
386 shard_ids.push(METACHAIN_SHARD_ID);
387 }
388 shard_ids.sort_unstable();
389 shard_ids.dedup();
390
391 let mut seen = HashSet::new();
392 let mut topics = Vec::new();
393
394 for (idx, shard_a) in shard_ids.iter().enumerate() {
395 for shard_b in &shard_ids[idx..] {
396 let topic = broadcast_topic(base, *shard_a, *shard_b);
397 if seen.insert(topic.clone()) {
398 topics.push(topic);
399 }
400 }
401 }
402
403 if include_all {
404 let topic = broadcast_topic(base, ALL_SHARD_ID, ALL_SHARD_ID);
405 if seen.insert(topic.clone()) {
406 topics.push(topic);
407 }
408 }
409
410 topics
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416
417 #[test]
418 fn communication_identifier_matches_go_logic() {
419 assert_eq!(
420 communication_identifier_between_shards(0, 1),
421 "_0_1".to_owned()
422 );
423 assert_eq!(
424 communication_identifier_between_shards(2, 0),
425 "_0_2".to_owned()
426 );
427 assert_eq!(
428 communication_identifier_between_shards(METACHAIN_SHARD_ID, 1),
429 "_1_META".to_owned()
430 );
431 assert_eq!(
432 communication_identifier_between_shards(ALL_SHARD_ID, 0),
433 "_ALL".to_owned()
434 );
435 assert_eq!(
436 communication_identifier_between_shards(2, 2),
437 "_2".to_owned()
438 );
439 }
440
441 #[test]
442 fn transaction_topics_cover_all_pairs() {
443 let topics = transaction_topics_from_shards(&[0, 1, 2], true, true);
444 assert!(topics.contains(&"transactions_0".to_owned()));
445 assert!(topics.contains(&"transactions_0_1".to_owned()));
446 assert!(topics.contains(&"transactions_1_2".to_owned()));
447 assert!(topics.contains(&"transactions_0_META".to_owned()));
448 assert!(topics.contains(&"transactions_META".to_owned()));
449 assert!(topics.contains(&"transactions_ALL".to_owned()));
450 }
451
452 #[test]
453 fn all_topics_include_control_channels() {
454 let topics = all_topics_for_shards(&[0, 1], true, true);
455 assert!(topics.contains(&"connection".to_owned()));
456 assert!(topics.contains(&"peerAuthentication".to_owned()));
457 assert!(topics.iter().any(|t| t.starts_with("consensus_")));
458 assert!(topics.iter().any(|t| t.starts_with("transactions_")));
459 assert!(topics.contains(&"metachainBlocks".to_owned()));
460 }
461
462 #[test]
463 fn topic_info_parses_cross_shard_transactions() {
464 let info = TopicInfo::parse("transactions_0_2").expect("parse");
465 assert_eq!(info.base, BaseTopic::Transactions);
466 assert!(
467 matches!(&info.routing, TopicRouting::Target(shards) if shards.len() == 2
468 && matches!(shards[0], TopicShard::Shard(0))
469 && matches!(shards[1], TopicShard::Shard(2))),
470 "unexpected routing: {:?}",
471 info.routing
472 );
473 }
474
475 #[test]
476 fn topic_info_parses_meta_route() {
477 let info = TopicInfo::parse("transactions_META").expect("parse");
478 assert_eq!(info.base, BaseTopic::Transactions);
479 assert!(
480 matches!(&info.routing, TopicRouting::Target(shards) if shards.len() == 1
481 && matches!(shards[0], TopicShard::Metachain)),
482 "unexpected routing: {:?}",
483 info.routing
484 );
485 }
486
487 #[test]
488 fn topic_info_parses_global_topic() {
489 let info = TopicInfo::parse("validatorInfo").expect("parse");
490 assert_eq!(info.base, BaseTopic::ValidatorInfo);
491 assert!(info.routing.is_global());
492 }
493
494 #[test]
495 fn topic_shard_from_numeric_metachain() {
496 let info = TopicInfo::parse("transactions_4294967295").expect("parse");
499 assert_eq!(info.base, BaseTopic::Transactions);
500 assert!(
501 matches!(
502 &info.routing,
503 TopicRouting::Target(shards) if shards.len() == 1
504 && matches!(shards[0], TopicShard::Metachain)
505 ),
506 "expected Metachain, got {:?}",
507 info.routing
508 );
509 }
510
511 #[test]
512 fn topic_shard_from_numeric_all() {
513 let info = TopicInfo::parse("transactions_4294967280").expect("parse");
516 assert_eq!(info.base, BaseTopic::Transactions);
517 assert!(
518 matches!(
519 &info.routing,
520 TopicRouting::Target(shards) if shards.len() == 1
521 && matches!(shards[0], TopicShard::All)
522 ),
523 "expected All, got {:?}",
524 info.routing
525 );
526 }
527
528 #[test]
529 fn cache_bounds_memory_usage() {
530 let initial_len = get_cache_len();
532
533 for i in 0..1000 {
535 let invalid_topic = format!("invalid_topic_{}", i);
536 let result = TopicInfo::parse_cached(&invalid_topic);
537 assert!(result.is_none());
538 }
539
540 assert_eq!(
542 get_cache_len(),
543 initial_len,
544 "Invalid topics should not be cached"
545 );
546
547 for i in 0..100 {
549 let valid_topic = format!("transactions_{}_META", i);
550 let result = TopicInfo::parse_cached(&valid_topic);
551 assert!(result.is_some());
552 }
553
554 let len_after_valid = get_cache_len();
555 assert!(
556 len_after_valid > initial_len,
557 "Valid topics should be cached"
558 );
559 assert!(
561 len_after_valid <= initial_len + 100,
562 "Cache size shouldn't exceed input count"
563 );
564
565 for i in 0..100 {
567 let valid_topic = format!("transactions_{}_META", i);
568 let _ = TopicInfo::parse_cached(&valid_topic);
569 }
570
571 let len_final = get_cache_len();
572 assert!(
576 len_final <= initial_len + 100,
577 "Cache grew unbounded on re-access: {} -> {}",
578 len_after_valid,
579 len_final
580 );
581 }
582}