1use serde::{Deserialize, Serialize};
55use std::collections::{HashMap, HashSet};
56use std::sync::Arc;
57use tokio::sync::RwLock;
58use tracing::{debug, info, warn};
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct TopicInfo {
67 pub topic: String,
69
70 pub message_type: String,
72
73 pub publishers: Vec<String>,
75
76 pub subscribers: Vec<String>,
78
79 pub created_at: u64,
81
82 pub last_publisher_added: Option<u64>,
84
85 pub last_subscriber_added: Option<u64>,
87
88 pub description: Option<String>,
90
91 pub metadata: HashMap<String, String>,
93}
94
95impl TopicInfo {
96 pub fn new(topic: &str, message_type: &str) -> Self {
98 Self {
99 topic: topic.to_string(),
100 message_type: message_type.to_string(),
101 publishers: Vec::new(),
102 subscribers: Vec::new(),
103 created_at: now_micros(),
104 last_publisher_added: None,
105 last_subscriber_added: None,
106 description: None,
107 metadata: HashMap::new(),
108 }
109 }
110
111 pub fn add_publisher(&mut self, node_id: &str) {
113 if !self.publishers.contains(&node_id.to_string()) {
114 self.publishers.push(node_id.to_string());
115 self.last_publisher_added = Some(now_micros());
116 }
117 }
118
119 pub fn add_subscriber(&mut self, node_id: &str) {
121 if !self.subscribers.contains(&node_id.to_string()) {
122 self.subscribers.push(node_id.to_string());
123 self.last_subscriber_added = Some(now_micros());
124 }
125 }
126
127 pub fn remove_publisher(&mut self, node_id: &str) {
129 self.publishers.retain(|p| p != node_id);
130 }
131
132 pub fn remove_subscriber(&mut self, node_id: &str) {
134 self.subscribers.retain(|s| s != node_id);
135 }
136
137 pub fn has_publishers(&self) -> bool {
139 !self.publishers.is_empty()
140 }
141
142 pub fn has_subscribers(&self) -> bool {
144 !self.subscribers.is_empty()
145 }
146
147 pub fn is_orphaned(&self) -> bool {
149 self.publishers.is_empty() || self.subscribers.is_empty()
150 }
151
152 pub fn publisher_count(&self) -> usize {
154 self.publishers.len()
155 }
156
157 pub fn subscriber_count(&self) -> usize {
159 self.subscribers.len()
160 }
161}
162
163#[derive(Clone)]
172pub struct TopicRegistry {
173 topics: Arc<RwLock<HashMap<String, TopicInfo>>>,
175
176 node_publishers: Arc<RwLock<HashMap<String, HashSet<String>>>>,
178
179 node_subscribers: Arc<RwLock<HashMap<String, HashSet<String>>>>,
181}
182
183impl TopicRegistry {
184 pub fn new() -> Self {
186 info!("Creating new topic registry");
187 Self {
188 topics: Arc::new(RwLock::new(HashMap::new())),
189 node_publishers: Arc::new(RwLock::new(HashMap::new())),
190 node_subscribers: Arc::new(RwLock::new(HashMap::new())),
191 }
192 }
193
194 pub async fn register_publisher(&self, topic: &str, node_id: &str, message_type: &str) {
202 let mut topics = self.topics.write().await;
203
204 let topic_info = topics
205 .entry(topic.to_string())
206 .or_insert_with(|| TopicInfo::new(topic, message_type));
207
208 topic_info.add_publisher(node_id);
209
210 let mut node_pubs = self.node_publishers.write().await;
212 node_pubs
213 .entry(node_id.to_string())
214 .or_default()
215 .insert(topic.to_string());
216
217 debug!(
218 "Registered publisher: node='{}', topic='{}', type='{}'",
219 node_id, topic, message_type
220 );
221 }
222
223 pub async fn register_subscriber(&self, topic: &str, node_id: &str, message_type: &str) {
231 let mut topics = self.topics.write().await;
232
233 let topic_info = topics
234 .entry(topic.to_string())
235 .or_insert_with(|| TopicInfo::new(topic, message_type));
236
237 topic_info.add_subscriber(node_id);
238
239 let mut node_subs = self.node_subscribers.write().await;
241 node_subs
242 .entry(node_id.to_string())
243 .or_default()
244 .insert(topic.to_string());
245
246 debug!(
247 "Registered subscriber: node='{}', topic='{}', type='{}'",
248 node_id, topic, message_type
249 );
250 }
251
252 pub async fn unregister_publisher(&self, topic: &str, node_id: &str) {
254 let mut topics = self.topics.write().await;
255
256 if let Some(info) = topics.get_mut(topic) {
257 info.remove_publisher(node_id);
258
259 if info.publishers.is_empty() && info.subscribers.is_empty() {
261 topics.remove(topic);
262 debug!("Removed topic '{}' (no publishers or subscribers)", topic);
263 }
264 }
265
266 let mut node_pubs = self.node_publishers.write().await;
268 if let Some(topics_set) = node_pubs.get_mut(node_id) {
269 topics_set.remove(topic);
270 if topics_set.is_empty() {
271 node_pubs.remove(node_id);
272 }
273 }
274
275 debug!("Unregistered publisher: node='{}', topic='{}'", node_id, topic);
276 }
277
278 pub async fn unregister_subscriber(&self, topic: &str, node_id: &str) {
280 let mut topics = self.topics.write().await;
281
282 if let Some(info) = topics.get_mut(topic) {
283 info.remove_subscriber(node_id);
284
285 if info.publishers.is_empty() && info.subscribers.is_empty() {
287 topics.remove(topic);
288 debug!("Removed topic '{}' (no publishers or subscribers)", topic);
289 }
290 }
291
292 let mut node_subs = self.node_subscribers.write().await;
294 if let Some(topics_set) = node_subs.get_mut(node_id) {
295 topics_set.remove(topic);
296 if topics_set.is_empty() {
297 node_subs.remove(node_id);
298 }
299 }
300
301 debug!("Unregistered subscriber: node='{}', topic='{}'", node_id, topic);
302 }
303
304 pub async fn unregister_node(&self, node_id: &str) {
306 let topics_published = {
308 let node_pubs = self.node_publishers.read().await;
309 node_pubs.get(node_id).cloned().unwrap_or_default()
310 };
311
312 for topic in topics_published {
313 self.unregister_publisher(&topic, node_id).await;
314 }
315
316 let topics_subscribed = {
318 let node_subs = self.node_subscribers.read().await;
319 node_subs.get(node_id).cloned().unwrap_or_default()
320 };
321
322 for topic in topics_subscribed {
323 self.unregister_subscriber(&topic, node_id).await;
324 }
325
326 info!("Unregistered all topics for node '{}'", node_id);
327 }
328
329 pub async fn get_topic(&self, topic: &str) -> Option<TopicInfo> {
331 let topics = self.topics.read().await;
332 topics.get(topic).cloned()
333 }
334
335 pub async fn list_topics(&self) -> Vec<String> {
337 let topics = self.topics.read().await;
338 topics.keys().cloned().collect()
339 }
340
341 pub async fn get_all_topics(&self) -> Vec<TopicInfo> {
343 let topics = self.topics.read().await;
344 topics.values().cloned().collect()
345 }
346
347 pub async fn find_by_message_type(&self, message_type: &str) -> Vec<TopicInfo> {
349 let topics = self.topics.read().await;
350 topics
351 .values()
352 .filter(|info| info.message_type == message_type)
353 .cloned()
354 .collect()
355 }
356
357 pub async fn topics_published_by(&self, node_id: &str) -> Vec<String> {
359 let node_pubs = self.node_publishers.read().await;
360 node_pubs
361 .get(node_id)
362 .cloned()
363 .unwrap_or_default()
364 .into_iter()
365 .collect()
366 }
367
368 pub async fn topics_subscribed_by(&self, node_id: &str) -> Vec<String> {
370 let node_subs = self.node_subscribers.read().await;
371 node_subs
372 .get(node_id)
373 .cloned()
374 .unwrap_or_default()
375 .into_iter()
376 .collect()
377 }
378
379 pub async fn topic_count(&self) -> usize {
381 let topics = self.topics.read().await;
382 topics.len()
383 }
384
385 pub async fn validate(&self) -> Vec<String> {
392 let mut issues = Vec::new();
393 let topics = self.topics.read().await;
394
395 for (topic, info) in topics.iter() {
396 if info.publishers.is_empty() {
397 issues.push(format!(
398 "Topic '{}' has no publishers (subscribed by: {:?})",
399 topic, info.subscribers
400 ));
401 }
402
403 if info.subscribers.is_empty() {
404 issues.push(format!(
405 "Topic '{}' has no subscribers (published by: {:?})",
406 topic, info.publishers
407 ));
408 }
409 }
410
411 if !issues.is_empty() {
412 warn!("Found {} topology issues", issues.len());
413 }
414
415 issues
416 }
417
418 pub async fn stats(&self) -> TopologyStats {
420 let topics = self.topics.read().await;
421
422 let total_topics = topics.len();
423 let total_publishers: usize = topics.values().map(|t| t.publishers.len()).sum();
424 let total_subscribers: usize = topics.values().map(|t| t.subscribers.len()).sum();
425
426 let orphaned_topics = topics.values().filter(|t| t.is_orphaned()).count();
427
428 let topics_without_publishers = topics.values().filter(|t| t.publishers.is_empty()).count();
429
430 let topics_without_subscribers = topics.values().filter(|t| t.subscribers.is_empty()).count();
431
432 TopologyStats {
433 total_topics,
434 total_publishers,
435 total_subscribers,
436 orphaned_topics,
437 topics_without_publishers,
438 topics_without_subscribers,
439 }
440 }
441
442 pub async fn export_dot(&self) -> String {
444 let mut dot = String::from("digraph TopicTopology {\n");
445 dot.push_str(" rankdir=LR;\n");
446 dot.push_str(" node [shape=box];\n\n");
447
448 let topics = self.topics.read().await;
449
450 for (topic, info) in topics.iter() {
452 let label = format!("{}\\n({})", topic, info.message_type);
453 dot.push_str(&format!(
454 " \"{}\" [label=\"{}\", shape=ellipse, style=filled, fillcolor=lightblue];\n",
455 topic, label
456 ));
457 }
458
459 dot.push('\n');
460
461 for (topic, info) in topics.iter() {
463 for pub_node in &info.publishers {
464 dot.push_str(&format!(
465 " \"{}\" -> \"{}\" [label=\"pub\", color=green];\n",
466 pub_node, topic
467 ));
468 }
469
470 for sub_node in &info.subscribers {
471 dot.push_str(&format!(
472 " \"{}\" -> \"{}\" [label=\"sub\", color=blue];\n",
473 topic, sub_node
474 ));
475 }
476 }
477
478 dot.push_str("}\n");
479 dot
480 }
481
482 pub async fn clear(&self) {
484 let mut topics = self.topics.write().await;
485 topics.clear();
486
487 let mut node_pubs = self.node_publishers.write().await;
488 node_pubs.clear();
489
490 let mut node_subs = self.node_subscribers.write().await;
491 node_subs.clear();
492
493 info!("Cleared topic registry");
494 }
495}
496
497impl Default for TopicRegistry {
498 fn default() -> Self {
499 Self::new()
500 }
501}
502
503#[derive(Debug, Clone, Serialize, Deserialize)]
509pub struct TopologyStats {
510 pub total_topics: usize,
512
513 pub total_publishers: usize,
515
516 pub total_subscribers: usize,
518
519 pub orphaned_topics: usize,
521
522 pub topics_without_publishers: usize,
524
525 pub topics_without_subscribers: usize,
527}
528
529fn now_micros() -> u64 {
534 use std::time::{SystemTime, UNIX_EPOCH};
535 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as u64
536}