Skip to main content

mx_core/topics/
gossip.rs

1use moka::sync::Cache;
2use std::collections::HashSet;
3use std::sync::OnceLock;
4use std::time::Duration;
5
6use crate::constants::{ALL_SHARD_ID, METACHAIN_SHARD_ID};
7
8/// Global cache for parsed topic information to avoid repeated string parsing.
9///
10/// Uses `moka` for specialized caching with:
11/// - Cap on entries (10,000) to prevent memory exhaustion
12/// - TTL (1 hour) to expire stale entries
13/// - Thread-safe concurrent access
14static TOPIC_CACHE: OnceLock<Cache<String, TopicInfo>> = OnceLock::new();
15
16fn get_topic_cache() -> &'static Cache<String, TopicInfo> {
17    TOPIC_CACHE.get_or_init(|| {
18        Cache::builder()
19            // Max 10k entries (approx 1-2MB RAM) - plenty for legit topics
20            .max_capacity(10_000)
21            // 1 hour TTL - topics are long-lived but this prevents stale buildup
22            .time_to_live(Duration::from_secs(3600))
23            .build()
24    })
25}
26
27/// Helper for testing to check cache size
28#[cfg(test)]
29pub fn get_cache_len() -> u64 {
30    get_topic_cache().entry_count()
31}
32
33/// Base topic used when broadcasting regular transactions.
34pub const TRANSACTIONS_BASE_TOPIC: &str = "transactions";
35
36/// Enumeration of all gossip topic families used by `MultiversX` nodes.
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
38pub enum BaseTopic {
39    /// Regular user-submitted transactions
40    Transactions,
41    /// Unsigned transactions (smart contract results)
42    UnsignedTransactions,
43    /// Staking/delegation reward transactions
44    RewardsTransactions,
45    /// Block headers for shard chains
46    ShardBlocks,
47    /// Transaction batch miniblocks
48    MiniBlocks,
49    /// Peer list change notifications
50    PeerChangeBlockBodies,
51    /// Metachain block headers
52    MetachainBlocks,
53    /// Account state trie synchronization
54    AccountTrieNodes,
55    /// Validator state trie synchronization
56    ValidatorTrieNodes,
57    /// Consensus protocol messages
58    Consensus,
59    /// Node heartbeat/health messages
60    HeartbeatV2,
61    /// P2P peer authentication handshake
62    PeerAuthentication,
63    /// P2P connection control messages
64    Connection,
65    /// Validator set and rating info
66    ValidatorInfo,
67    /// Equivocation proofs for slashing
68    EquivalentProofs,
69}
70
71impl BaseTopic {
72    /// Returns an iterator over every base topic variant.
73    pub const fn iter() -> &'static [Self] {
74        &[
75            Self::Transactions,
76            Self::UnsignedTransactions,
77            Self::RewardsTransactions,
78            Self::ShardBlocks,
79            Self::MiniBlocks,
80            Self::PeerChangeBlockBodies,
81            Self::MetachainBlocks,
82            Self::AccountTrieNodes,
83            Self::ValidatorTrieNodes,
84            Self::Consensus,
85            Self::HeartbeatV2,
86            Self::PeerAuthentication,
87            Self::Connection,
88            Self::ValidatorInfo,
89            Self::EquivalentProofs,
90        ]
91    }
92
93    /// Returns the canonical base name used in the gossip topic.
94    pub const fn base_name(self) -> &'static str {
95        match self {
96            Self::Transactions => "transactions",
97            Self::UnsignedTransactions => "unsignedTransactions",
98            Self::RewardsTransactions => "rewardsTransactions",
99            Self::ShardBlocks => "shardBlocks",
100            Self::MiniBlocks => "txBlockBodies",
101            Self::PeerChangeBlockBodies => "peerChangeBlockBodies",
102            Self::MetachainBlocks => "metachainBlocks",
103            Self::AccountTrieNodes => "accountTrieNodes",
104            Self::ValidatorTrieNodes => "validatorTrieNodes",
105            Self::Consensus => "consensus",
106            Self::HeartbeatV2 => "heartbeatV2",
107            Self::PeerAuthentication => "peerAuthentication",
108            Self::Connection => "connection",
109            Self::ValidatorInfo => "validatorInfo",
110            Self::EquivalentProofs => "equivalentProofs",
111        }
112    }
113
114    /// Attempts to resolve a base topic by its canonical name.
115    pub fn from_name(name: &str) -> Option<Self> {
116        Self::iter()
117            .iter()
118            .copied()
119            .find(|base| base.base_name() == name)
120    }
121
122    /// Splits a full topic name into its base component and suffix (if any).
123    pub fn classify_topic(topic: &str) -> Option<(Self, &str)> {
124        for base in Self::iter() {
125            let base_name = base.base_name();
126            if topic == base_name {
127                return Some((*base, ""));
128            }
129            if let Some(suffix) = topic.strip_prefix(base_name).filter(|s| s.starts_with('_')) {
130                return Some((*base, suffix));
131            }
132        }
133        None
134    }
135
136    fn uses_shard_identifiers(self) -> bool {
137        !matches!(
138            self,
139            Self::MetachainBlocks
140                | Self::PeerAuthentication
141                | Self::Connection
142                | Self::ValidatorInfo
143        )
144    }
145
146    /// Generates all topic names for this base topic with the given shard configuration.
147    pub fn generate_topics(
148        self,
149        shards: &[u32],
150        include_metachain: bool,
151        include_all: bool,
152    ) -> Vec<String> {
153        let mut topics = Vec::new();
154        topics.push(self.base_name().to_owned());
155
156        if self.uses_shard_identifiers() {
157            topics.extend(generate_pair_topics(
158                self.base_name(),
159                shards,
160                include_metachain,
161                include_all,
162            ));
163        }
164
165        topics
166    }
167}
168
169/// Parsed information for a concrete gossip topic.
170#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct TopicInfo {
172    /// The base topic family (e.g., Transactions, Consensus)
173    pub base: BaseTopic,
174    /// Routing information (Global or specific shard targets)
175    pub routing: TopicRouting,
176}
177
178impl TopicInfo {
179    /// Parses a topic string into base type and routing info (e.g., "`transactions_0_1`" -> Transactions with shards [0, 1]).
180    pub fn parse(topic: &str) -> Option<Self> {
181        let (base, suffix) = BaseTopic::classify_topic(topic)?;
182        let routing = parse_routing_suffix(suffix)?;
183        Some(Self { base, routing })
184    }
185
186    /// Cached version of `parse()` for hot paths.
187    ///
188    /// Uses a global cache to avoid repeated string parsing. Since `MultiversX` nodes
189    /// only subscribe to a bounded set of topics, the cache remains small (~64 entries).
190    pub fn parse_cached(topic: &str) -> Option<Self> {
191        let cache = get_topic_cache();
192
193        // Fast path: check cache
194        if let Some(cached) = cache.get(topic) {
195            return Some(cached);
196        }
197
198        // Slow path: parse
199        let parsed = Self::parse(topic);
200
201        // Only cache if valid!
202        if let Some(info) = &parsed {
203            cache.insert(topic.to_owned(), info.clone());
204        }
205
206        parsed
207    }
208}
209
210/// Represents how a topic is routed across shards.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum TopicRouting {
213    /// Topic is broadcast to all nodes regardless of shard
214    Global,
215    /// Topic targets specific shard(s)
216    Target(Vec<TopicShard>),
217}
218
219impl TopicRouting {
220    /// Returns true if this is a global topic (not shard-specific).
221    pub fn is_global(&self) -> bool {
222        matches!(self, Self::Global)
223    }
224
225    /// Returns the target shards, or empty slice for global topics.
226    pub fn shards(&self) -> &[TopicShard] {
227        match self {
228            Self::Global => &[],
229            Self::Target(shards) => shards.as_slice(),
230        }
231    }
232}
233
234/// A shard token extracted from a topic suffix.
235#[derive(Debug, Clone, PartialEq, Eq)]
236pub enum TopicShard {
237    /// Regular shard with numeric ID (0, 1, 2, etc.)
238    Shard(u32),
239    /// Metachain (shard 0xFFFFFFFF)
240    Metachain,
241    /// All shards wildcard (shard 0xFFFFFFF0)
242    All,
243}
244
245impl TopicShard {
246    fn from_token(token: &str) -> Option<Self> {
247        if token.eq_ignore_ascii_case("META") {
248            return Some(Self::Metachain);
249        }
250        if token.eq_ignore_ascii_case("ALL") {
251            return Some(Self::All);
252        }
253        token.parse::<u32>().ok().map(|v| match v {
254            METACHAIN_SHARD_ID => TopicShard::Metachain,
255            ALL_SHARD_ID => TopicShard::All,
256            _ => TopicShard::Shard(v),
257        })
258    }
259
260    /// Converts to numeric shard ID, or None for Metachain/All variants.
261    pub fn as_u32(&self) -> Option<u32> {
262        match self {
263            Self::Shard(value) => Some(*value),
264            _ => None,
265        }
266    }
267}
268
269/// Parses a topic routing suffix into a `TopicRouting` value.
270///
271/// An empty suffix returns `TopicRouting::Global`. Otherwise, the suffix is split
272/// by underscores and each token is parsed as a `TopicShard`.
273fn parse_routing_suffix(suffix: &str) -> Option<TopicRouting> {
274    if suffix.is_empty() {
275        return Some(TopicRouting::Global);
276    }
277
278    let trimmed = suffix.strip_prefix('_')?;
279    if trimmed.is_empty() {
280        return Some(TopicRouting::Global);
281    }
282
283    let mut shards = Vec::new();
284    for token in trimmed.split('_') {
285        let shard = TopicShard::from_token(token)?;
286        shards.push(shard);
287    }
288
289    Some(TopicRouting::Target(shards))
290}
291
292/// Converts a shard ID to its topic suffix string representation.
293///
294/// Mirrors Go's `ShardIdToString` from mx-chain-go.
295fn shard_id_to_string(shard_id: u32) -> String {
296    match shard_id {
297        METACHAIN_SHARD_ID => "_META".to_owned(),
298        ALL_SHARD_ID => "_ALL".to_owned(),
299        _ => format!("_{shard_id}"),
300    }
301}
302
303/// Mirrors Go's `CommunicationIdentifierBetweenShards`.
304pub fn communication_identifier_between_shards(shard_id1: u32, shard_id2: u32) -> String {
305    if shard_id1 == ALL_SHARD_ID || shard_id2 == ALL_SHARD_ID {
306        return shard_id_to_string(ALL_SHARD_ID);
307    }
308
309    if shard_id1 == shard_id2 {
310        return shard_id_to_string(shard_id1);
311    }
312
313    if shard_id1 < shard_id2 {
314        return format!(
315            "{}{}",
316            shard_id_to_string(shard_id1),
317            shard_id_to_string(shard_id2)
318        );
319    }
320
321    format!(
322        "{}{}",
323        shard_id_to_string(shard_id2),
324        shard_id_to_string(shard_id1)
325    )
326}
327
328/// Builds a gossipsub topic name for a shard pair (e.g., "`transactions_0_1`").
329/// Shards are automatically sorted to ensure canonical ordering per `MultiversX` protocol.
330pub fn broadcast_topic(base: &str, shard_id1: u32, shard_id2: u32) -> String {
331    format!(
332        "{base}{}",
333        communication_identifier_between_shards(shard_id1, shard_id2)
334    )
335}
336
337/// Convenience wrapper generating all transaction topics for the provided shard set.
338pub fn transaction_topics_from_shards(
339    shards: &[u32],
340    include_metachain: bool,
341    include_all: bool,
342) -> Vec<String> {
343    let mut topics = generate_pair_topics(
344        TRANSACTIONS_BASE_TOPIC,
345        shards,
346        include_metachain,
347        include_all,
348    );
349    topics.sort();
350    topics
351}
352
353/// Generates every known topic (including control channels) for the provided shard set.
354pub fn all_topics_for_shards(
355    shards: &[u32],
356    include_metachain: bool,
357    include_all: bool,
358) -> Vec<String> {
359    let mut seen = HashSet::new();
360    let mut topics = Vec::new();
361
362    for base in BaseTopic::iter() {
363        for topic in base.generate_topics(shards, include_metachain, include_all) {
364            if seen.insert(topic.clone()) {
365                topics.push(topic);
366            }
367        }
368    }
369
370    topics.sort();
371    topics
372}
373
374/// Generates all pairwise shard topic combinations for the given base topic.
375///
376/// Creates topics for every unique (`shard_a`, `shard_b`) pair where `shard_a` <= `shard_b`.
377/// Optionally includes metachain and ALL shard variants.
378fn generate_pair_topics(
379    base: &str,
380    shards: &[u32],
381    include_metachain: bool,
382    include_all: bool,
383) -> Vec<String> {
384    let mut shard_ids: Vec<u32> = shards.to_vec();
385    if include_metachain {
386        shard_ids.push(METACHAIN_SHARD_ID);
387    }
388    shard_ids.sort_unstable();
389    shard_ids.dedup();
390
391    let mut seen = HashSet::new();
392    let mut topics = Vec::new();
393
394    for (idx, shard_a) in shard_ids.iter().enumerate() {
395        for shard_b in &shard_ids[idx..] {
396            let topic = broadcast_topic(base, *shard_a, *shard_b);
397            if seen.insert(topic.clone()) {
398                topics.push(topic);
399            }
400        }
401    }
402
403    if include_all {
404        let topic = broadcast_topic(base, ALL_SHARD_ID, ALL_SHARD_ID);
405        if seen.insert(topic.clone()) {
406            topics.push(topic);
407        }
408    }
409
410    topics
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416
417    #[test]
418    fn communication_identifier_matches_go_logic() {
419        assert_eq!(
420            communication_identifier_between_shards(0, 1),
421            "_0_1".to_owned()
422        );
423        assert_eq!(
424            communication_identifier_between_shards(2, 0),
425            "_0_2".to_owned()
426        );
427        assert_eq!(
428            communication_identifier_between_shards(METACHAIN_SHARD_ID, 1),
429            "_1_META".to_owned()
430        );
431        assert_eq!(
432            communication_identifier_between_shards(ALL_SHARD_ID, 0),
433            "_ALL".to_owned()
434        );
435        assert_eq!(
436            communication_identifier_between_shards(2, 2),
437            "_2".to_owned()
438        );
439    }
440
441    #[test]
442    fn transaction_topics_cover_all_pairs() {
443        let topics = transaction_topics_from_shards(&[0, 1, 2], true, true);
444        assert!(topics.contains(&"transactions_0".to_owned()));
445        assert!(topics.contains(&"transactions_0_1".to_owned()));
446        assert!(topics.contains(&"transactions_1_2".to_owned()));
447        assert!(topics.contains(&"transactions_0_META".to_owned()));
448        assert!(topics.contains(&"transactions_META".to_owned()));
449        assert!(topics.contains(&"transactions_ALL".to_owned()));
450    }
451
452    #[test]
453    fn all_topics_include_control_channels() {
454        let topics = all_topics_for_shards(&[0, 1], true, true);
455        assert!(topics.contains(&"connection".to_owned()));
456        assert!(topics.contains(&"peerAuthentication".to_owned()));
457        assert!(topics.iter().any(|t| t.starts_with("consensus_")));
458        assert!(topics.iter().any(|t| t.starts_with("transactions_")));
459        assert!(topics.contains(&"metachainBlocks".to_owned()));
460    }
461
462    #[test]
463    fn topic_info_parses_cross_shard_transactions() {
464        let info = TopicInfo::parse("transactions_0_2").expect("parse");
465        assert_eq!(info.base, BaseTopic::Transactions);
466        assert!(
467            matches!(&info.routing, TopicRouting::Target(shards) if shards.len() == 2
468                && matches!(shards[0], TopicShard::Shard(0))
469                && matches!(shards[1], TopicShard::Shard(2))),
470            "unexpected routing: {:?}",
471            info.routing
472        );
473    }
474
475    #[test]
476    fn topic_info_parses_meta_route() {
477        let info = TopicInfo::parse("transactions_META").expect("parse");
478        assert_eq!(info.base, BaseTopic::Transactions);
479        assert!(
480            matches!(&info.routing, TopicRouting::Target(shards) if shards.len() == 1
481                && matches!(shards[0], TopicShard::Metachain)),
482            "unexpected routing: {:?}",
483            info.routing
484        );
485    }
486
487    #[test]
488    fn topic_info_parses_global_topic() {
489        let info = TopicInfo::parse("validatorInfo").expect("parse");
490        assert_eq!(info.base, BaseTopic::ValidatorInfo);
491        assert!(info.routing.is_global());
492    }
493
494    #[test]
495    fn topic_shard_from_numeric_metachain() {
496        // Parsing the numeric representation of METACHAIN_SHARD_ID (4294967295)
497        // should produce TopicShard::Metachain, not TopicShard::Shard(0xFFFFFFFF).
498        let info = TopicInfo::parse("transactions_4294967295").expect("parse");
499        assert_eq!(info.base, BaseTopic::Transactions);
500        assert!(
501            matches!(
502                &info.routing,
503                TopicRouting::Target(shards) if shards.len() == 1
504                    && matches!(shards[0], TopicShard::Metachain)
505            ),
506            "expected Metachain, got {:?}",
507            info.routing
508        );
509    }
510
511    #[test]
512    fn topic_shard_from_numeric_all() {
513        // Parsing the numeric representation of ALL_SHARD_ID (4294967280)
514        // should produce TopicShard::All, not TopicShard::Shard(0xFFFFFFF0).
515        let info = TopicInfo::parse("transactions_4294967280").expect("parse");
516        assert_eq!(info.base, BaseTopic::Transactions);
517        assert!(
518            matches!(
519                &info.routing,
520                TopicRouting::Target(shards) if shards.len() == 1
521                    && matches!(shards[0], TopicShard::All)
522            ),
523            "expected All, got {:?}",
524            info.routing
525        );
526    }
527
528    #[test]
529    fn cache_bounds_memory_usage() {
530        // Ensure cache ignores invalid topics
531        let initial_len = get_cache_len();
532
533        // 1. Parse invalid topics -> Should NOT be cached
534        for i in 0..1000 {
535            let invalid_topic = format!("invalid_topic_{}", i);
536            let result = TopicInfo::parse_cached(&invalid_topic);
537            assert!(result.is_none());
538        }
539
540        // Cache size should not have increased
541        assert_eq!(
542            get_cache_len(),
543            initial_len,
544            "Invalid topics should not be cached"
545        );
546
547        // 2. Parse valid topics -> Should be cached
548        for i in 0..100 {
549            let valid_topic = format!("transactions_{}_META", i);
550            let result = TopicInfo::parse_cached(&valid_topic);
551            assert!(result.is_some());
552        }
553
554        let len_after_valid = get_cache_len();
555        assert!(
556            len_after_valid > initial_len,
557            "Valid topics should be cached"
558        );
559        // It might not be exactly 100 due to eventual consistency/admission, but it should be close
560        assert!(
561            len_after_valid <= initial_len + 100,
562            "Cache size shouldn't exceed input count"
563        );
564
565        // 3. Re-access valid topics
566        for i in 0..100 {
567            let valid_topic = format!("transactions_{}_META", i);
568            let _ = TopicInfo::parse_cached(&valid_topic);
569        }
570
571        let len_final = get_cache_len();
572        // The key is that it's BOUNDED. It shouldn't have doubled (approx 200).
573        // It might have grown from 88 to 100 if previous inserts were lazy/dropped,
574        // but it must not exceed the unique valid topics count.
575        assert!(
576            len_final <= initial_len + 100,
577            "Cache grew unbounded on re-access: {} -> {}",
578            len_after_valid,
579            len_final
580        );
581    }
582}