mecha10_core/
topic_registry.rs

1//! Topic Registry for Runtime Introspection and Validation
2//!
3//! Provides a central registry for tracking all active topics, their publishers,
4//! subscribers, and message types. Enables runtime discovery, monitoring, and
5//! debugging of the pub/sub system.
6//!
7//! # Use Cases
8//!
9//! - Discover what topics exist at runtime
10//! - Find which nodes publish/subscribe to a topic
11//! - Validate topic usage (orphaned topics, missing publishers/subscribers)
12//! - Monitor topic activity (message rates, last activity)
13//! - Debug communication issues
14//! - Generate system topology diagrams
15//!
16//! # Example
17//!
18//! ```rust
19//! use mecha10::prelude::*;
20//! use mecha10::topic_registry::{TopicRegistry, TopicInfo};
21//!
22//! # async fn example() -> Result<()> {
23//! let registry = TopicRegistry::new();
24//!
25//! // Register a publisher
26//! registry.register_publisher("/sensor/camera/rgb", "camera_node", "Image").await;
27//!
28//! // Register a subscriber
29//! registry.register_subscriber("/sensor/camera/rgb", "vision_node", "Image").await;
30//!
31//! // Query topic info
32//! if let Some(info) = registry.get_topic("/sensor/camera/rgb").await {
33//!     println!("Topic: {}", info.topic);
34//!     println!("Publishers: {:?}", info.publishers);
35//!     println!("Subscribers: {:?}", info.subscribers);
36//!     println!("Message type: {}", info.message_type);
37//! }
38//!
39//! // List all topics
40//! let topics = registry.list_topics().await;
41//! for topic in topics {
42//!     println!("- {}", topic);
43//! }
44//!
45//! // Validate topology
46//! let issues = registry.validate().await;
47//! for issue in issues {
48//!     println!("Warning: {}", issue);
49//! }
50//! # Ok(())
51//! # }
52//! ```
53
54use serde::{Deserialize, Serialize};
55use std::collections::{HashMap, HashSet};
56use std::sync::Arc;
57use tokio::sync::RwLock;
58use tracing::{debug, info, warn};
59
60// ============================================================================
61// Topic Information
62// ============================================================================
63
64/// Information about a topic
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct TopicInfo {
67    /// Topic path (e.g., "/sensor/camera/rgb")
68    pub topic: String,
69
70    /// Message type name (e.g., "Image", "LaserScan")
71    pub message_type: String,
72
73    /// Nodes that publish to this topic
74    pub publishers: Vec<String>,
75
76    /// Nodes that subscribe to this topic
77    pub subscribers: Vec<String>,
78
79    /// When the topic was first registered
80    pub created_at: u64,
81
82    /// Last time a publisher was added
83    pub last_publisher_added: Option<u64>,
84
85    /// Last time a subscriber was added
86    pub last_subscriber_added: Option<u64>,
87
88    /// Optional description
89    pub description: Option<String>,
90
91    /// Custom metadata
92    pub metadata: HashMap<String, String>,
93}
94
95impl TopicInfo {
96    /// Create new topic info
97    pub fn new(topic: &str, message_type: &str) -> Self {
98        Self {
99            topic: topic.to_string(),
100            message_type: message_type.to_string(),
101            publishers: Vec::new(),
102            subscribers: Vec::new(),
103            created_at: now_micros(),
104            last_publisher_added: None,
105            last_subscriber_added: None,
106            description: None,
107            metadata: HashMap::new(),
108        }
109    }
110
111    /// Add a publisher
112    pub fn add_publisher(&mut self, node_id: &str) {
113        if !self.publishers.contains(&node_id.to_string()) {
114            self.publishers.push(node_id.to_string());
115            self.last_publisher_added = Some(now_micros());
116        }
117    }
118
119    /// Add a subscriber
120    pub fn add_subscriber(&mut self, node_id: &str) {
121        if !self.subscribers.contains(&node_id.to_string()) {
122            self.subscribers.push(node_id.to_string());
123            self.last_subscriber_added = Some(now_micros());
124        }
125    }
126
127    /// Remove a publisher
128    pub fn remove_publisher(&mut self, node_id: &str) {
129        self.publishers.retain(|p| p != node_id);
130    }
131
132    /// Remove a subscriber
133    pub fn remove_subscriber(&mut self, node_id: &str) {
134        self.subscribers.retain(|s| s != node_id);
135    }
136
137    /// Check if topic has any publishers
138    pub fn has_publishers(&self) -> bool {
139        !self.publishers.is_empty()
140    }
141
142    /// Check if topic has any subscribers
143    pub fn has_subscribers(&self) -> bool {
144        !self.subscribers.is_empty()
145    }
146
147    /// Check if topic is orphaned (no publishers or no subscribers)
148    pub fn is_orphaned(&self) -> bool {
149        self.publishers.is_empty() || self.subscribers.is_empty()
150    }
151
152    /// Get the number of publishers
153    pub fn publisher_count(&self) -> usize {
154        self.publishers.len()
155    }
156
157    /// Get the number of subscribers
158    pub fn subscriber_count(&self) -> usize {
159        self.subscribers.len()
160    }
161}
162
163// ============================================================================
164// Topic Registry
165// ============================================================================
166
167/// Central registry for tracking topics at runtime
168///
169/// Thread-safe registry that maintains information about all active topics,
170/// their publishers, and subscribers.
171#[derive(Clone)]
172pub struct TopicRegistry {
173    /// Topic information indexed by topic path
174    topics: Arc<RwLock<HashMap<String, TopicInfo>>>,
175
176    /// Node to published topics mapping
177    node_publishers: Arc<RwLock<HashMap<String, HashSet<String>>>>,
178
179    /// Node to subscribed topics mapping
180    node_subscribers: Arc<RwLock<HashMap<String, HashSet<String>>>>,
181}
182
183impl TopicRegistry {
184    /// Create a new topic registry
185    pub fn new() -> Self {
186        info!("Creating new topic registry");
187        Self {
188            topics: Arc::new(RwLock::new(HashMap::new())),
189            node_publishers: Arc::new(RwLock::new(HashMap::new())),
190            node_subscribers: Arc::new(RwLock::new(HashMap::new())),
191        }
192    }
193
194    /// Register a publisher for a topic
195    ///
196    /// # Arguments
197    ///
198    /// * `topic` - Topic path
199    /// * `node_id` - Publishing node identifier
200    /// * `message_type` - Message type name
201    pub async fn register_publisher(&self, topic: &str, node_id: &str, message_type: &str) {
202        let mut topics = self.topics.write().await;
203
204        let topic_info = topics
205            .entry(topic.to_string())
206            .or_insert_with(|| TopicInfo::new(topic, message_type));
207
208        topic_info.add_publisher(node_id);
209
210        // Update node publishers mapping
211        let mut node_pubs = self.node_publishers.write().await;
212        node_pubs
213            .entry(node_id.to_string())
214            .or_default()
215            .insert(topic.to_string());
216
217        debug!(
218            "Registered publisher: node='{}', topic='{}', type='{}'",
219            node_id, topic, message_type
220        );
221    }
222
223    /// Register a subscriber for a topic
224    ///
225    /// # Arguments
226    ///
227    /// * `topic` - Topic path
228    /// * `node_id` - Subscribing node identifier
229    /// * `message_type` - Message type name
230    pub async fn register_subscriber(&self, topic: &str, node_id: &str, message_type: &str) {
231        let mut topics = self.topics.write().await;
232
233        let topic_info = topics
234            .entry(topic.to_string())
235            .or_insert_with(|| TopicInfo::new(topic, message_type));
236
237        topic_info.add_subscriber(node_id);
238
239        // Update node subscribers mapping
240        let mut node_subs = self.node_subscribers.write().await;
241        node_subs
242            .entry(node_id.to_string())
243            .or_default()
244            .insert(topic.to_string());
245
246        debug!(
247            "Registered subscriber: node='{}', topic='{}', type='{}'",
248            node_id, topic, message_type
249        );
250    }
251
252    /// Unregister a publisher
253    pub async fn unregister_publisher(&self, topic: &str, node_id: &str) {
254        let mut topics = self.topics.write().await;
255
256        if let Some(info) = topics.get_mut(topic) {
257            info.remove_publisher(node_id);
258
259            // Clean up topic if no publishers or subscribers remain
260            if info.publishers.is_empty() && info.subscribers.is_empty() {
261                topics.remove(topic);
262                debug!("Removed topic '{}' (no publishers or subscribers)", topic);
263            }
264        }
265
266        // Update node publishers mapping
267        let mut node_pubs = self.node_publishers.write().await;
268        if let Some(topics_set) = node_pubs.get_mut(node_id) {
269            topics_set.remove(topic);
270            if topics_set.is_empty() {
271                node_pubs.remove(node_id);
272            }
273        }
274
275        debug!("Unregistered publisher: node='{}', topic='{}'", node_id, topic);
276    }
277
278    /// Unregister a subscriber
279    pub async fn unregister_subscriber(&self, topic: &str, node_id: &str) {
280        let mut topics = self.topics.write().await;
281
282        if let Some(info) = topics.get_mut(topic) {
283            info.remove_subscriber(node_id);
284
285            // Clean up topic if no publishers or subscribers remain
286            if info.publishers.is_empty() && info.subscribers.is_empty() {
287                topics.remove(topic);
288                debug!("Removed topic '{}' (no publishers or subscribers)", topic);
289            }
290        }
291
292        // Update node subscribers mapping
293        let mut node_subs = self.node_subscribers.write().await;
294        if let Some(topics_set) = node_subs.get_mut(node_id) {
295            topics_set.remove(topic);
296            if topics_set.is_empty() {
297                node_subs.remove(node_id);
298            }
299        }
300
301        debug!("Unregistered subscriber: node='{}', topic='{}'", node_id, topic);
302    }
303
304    /// Unregister all topics for a node (when node stops)
305    pub async fn unregister_node(&self, node_id: &str) {
306        // Unregister as publisher
307        let topics_published = {
308            let node_pubs = self.node_publishers.read().await;
309            node_pubs.get(node_id).cloned().unwrap_or_default()
310        };
311
312        for topic in topics_published {
313            self.unregister_publisher(&topic, node_id).await;
314        }
315
316        // Unregister as subscriber
317        let topics_subscribed = {
318            let node_subs = self.node_subscribers.read().await;
319            node_subs.get(node_id).cloned().unwrap_or_default()
320        };
321
322        for topic in topics_subscribed {
323            self.unregister_subscriber(&topic, node_id).await;
324        }
325
326        info!("Unregistered all topics for node '{}'", node_id);
327    }
328
329    /// Get information about a specific topic
330    pub async fn get_topic(&self, topic: &str) -> Option<TopicInfo> {
331        let topics = self.topics.read().await;
332        topics.get(topic).cloned()
333    }
334
335    /// List all registered topics
336    pub async fn list_topics(&self) -> Vec<String> {
337        let topics = self.topics.read().await;
338        topics.keys().cloned().collect()
339    }
340
341    /// Get all topic information
342    pub async fn get_all_topics(&self) -> Vec<TopicInfo> {
343        let topics = self.topics.read().await;
344        topics.values().cloned().collect()
345    }
346
347    /// Find topics by message type
348    pub async fn find_by_message_type(&self, message_type: &str) -> Vec<TopicInfo> {
349        let topics = self.topics.read().await;
350        topics
351            .values()
352            .filter(|info| info.message_type == message_type)
353            .cloned()
354            .collect()
355    }
356
357    /// Find topics published by a node
358    pub async fn topics_published_by(&self, node_id: &str) -> Vec<String> {
359        let node_pubs = self.node_publishers.read().await;
360        node_pubs
361            .get(node_id)
362            .cloned()
363            .unwrap_or_default()
364            .into_iter()
365            .collect()
366    }
367
368    /// Find topics subscribed to by a node
369    pub async fn topics_subscribed_by(&self, node_id: &str) -> Vec<String> {
370        let node_subs = self.node_subscribers.read().await;
371        node_subs
372            .get(node_id)
373            .cloned()
374            .unwrap_or_default()
375            .into_iter()
376            .collect()
377    }
378
379    /// Get total number of topics
380    pub async fn topic_count(&self) -> usize {
381        let topics = self.topics.read().await;
382        topics.len()
383    }
384
385    /// Validate the topic topology
386    ///
387    /// Returns a list of validation issues:
388    /// - Topics with no publishers
389    /// - Topics with no subscribers
390    /// - Type mismatches
391    pub async fn validate(&self) -> Vec<String> {
392        let mut issues = Vec::new();
393        let topics = self.topics.read().await;
394
395        for (topic, info) in topics.iter() {
396            if info.publishers.is_empty() {
397                issues.push(format!(
398                    "Topic '{}' has no publishers (subscribed by: {:?})",
399                    topic, info.subscribers
400                ));
401            }
402
403            if info.subscribers.is_empty() {
404                issues.push(format!(
405                    "Topic '{}' has no subscribers (published by: {:?})",
406                    topic, info.publishers
407                ));
408            }
409        }
410
411        if !issues.is_empty() {
412            warn!("Found {} topology issues", issues.len());
413        }
414
415        issues
416    }
417
418    /// Get topology statistics
419    pub async fn stats(&self) -> TopologyStats {
420        let topics = self.topics.read().await;
421
422        let total_topics = topics.len();
423        let total_publishers: usize = topics.values().map(|t| t.publishers.len()).sum();
424        let total_subscribers: usize = topics.values().map(|t| t.subscribers.len()).sum();
425
426        let orphaned_topics = topics.values().filter(|t| t.is_orphaned()).count();
427
428        let topics_without_publishers = topics.values().filter(|t| t.publishers.is_empty()).count();
429
430        let topics_without_subscribers = topics.values().filter(|t| t.subscribers.is_empty()).count();
431
432        TopologyStats {
433            total_topics,
434            total_publishers,
435            total_subscribers,
436            orphaned_topics,
437            topics_without_publishers,
438            topics_without_subscribers,
439        }
440    }
441
442    /// Export topology as DOT graph (for Graphviz visualization)
443    pub async fn export_dot(&self) -> String {
444        let mut dot = String::from("digraph TopicTopology {\n");
445        dot.push_str("  rankdir=LR;\n");
446        dot.push_str("  node [shape=box];\n\n");
447
448        let topics = self.topics.read().await;
449
450        // Add topic nodes
451        for (topic, info) in topics.iter() {
452            let label = format!("{}\\n({})", topic, info.message_type);
453            dot.push_str(&format!(
454                "  \"{}\" [label=\"{}\", shape=ellipse, style=filled, fillcolor=lightblue];\n",
455                topic, label
456            ));
457        }
458
459        dot.push('\n');
460
461        // Add publisher edges
462        for (topic, info) in topics.iter() {
463            for pub_node in &info.publishers {
464                dot.push_str(&format!(
465                    "  \"{}\" -> \"{}\" [label=\"pub\", color=green];\n",
466                    pub_node, topic
467                ));
468            }
469
470            for sub_node in &info.subscribers {
471                dot.push_str(&format!(
472                    "  \"{}\" -> \"{}\" [label=\"sub\", color=blue];\n",
473                    topic, sub_node
474                ));
475            }
476        }
477
478        dot.push_str("}\n");
479        dot
480    }
481
482    /// Clear all registered topics (for testing)
483    pub async fn clear(&self) {
484        let mut topics = self.topics.write().await;
485        topics.clear();
486
487        let mut node_pubs = self.node_publishers.write().await;
488        node_pubs.clear();
489
490        let mut node_subs = self.node_subscribers.write().await;
491        node_subs.clear();
492
493        info!("Cleared topic registry");
494    }
495}
496
497impl Default for TopicRegistry {
498    fn default() -> Self {
499        Self::new()
500    }
501}
502
503// ============================================================================
504// Topology Statistics
505// ============================================================================
506
507/// Statistics about the topic topology
508#[derive(Debug, Clone, Serialize, Deserialize)]
509pub struct TopologyStats {
510    /// Total number of topics
511    pub total_topics: usize,
512
513    /// Total number of publisher connections
514    pub total_publishers: usize,
515
516    /// Total number of subscriber connections
517    pub total_subscribers: usize,
518
519    /// Number of orphaned topics (no publishers OR no subscribers)
520    pub orphaned_topics: usize,
521
522    /// Topics with no publishers
523    pub topics_without_publishers: usize,
524
525    /// Topics with no subscribers
526    pub topics_without_subscribers: usize,
527}
528
529// ============================================================================
530// Helper Functions
531// ============================================================================
532
533fn now_micros() -> u64 {
534    use std::time::{SystemTime, UNIX_EPOCH};
535    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as u64
536}