1use anyhow::{Context, Result};
9use regex::Regex;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::path::{Path, PathBuf};
13
14use crate::services::ConfigService;
15use crate::types::ProjectConfig;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct Topology {
20 pub project_name: String,
21 pub redis: RedisInfo,
22 pub services: Vec<ServiceInfo>,
23 pub nodes: Vec<NodeTopology>,
24 pub topics: Vec<TopicTopology>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct RedisInfo {
30 pub url: String,
31 pub host: String,
32 pub port: u16,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct ServiceInfo {
38 pub name: String,
39 pub host: String,
40 pub port: u16,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct NodeTopology {
46 pub name: String,
47 pub package: String,
48 pub enabled: bool,
49 pub description: Option<String>,
50 pub publishes: Vec<TopicRef>,
51 pub subscribes: Vec<TopicRef>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
56pub struct TopicRef {
57 pub path: String,
58 pub message_type: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TopicTopology {
64 pub path: String,
65 pub message_type: Option<String>,
66 pub publishers: Vec<String>,
67 pub subscribers: Vec<String>,
68}
69
70pub struct TopologyService {
72 project_root: PathBuf,
73}
74
75impl TopologyService {
76 pub fn new(project_root: PathBuf) -> Self {
78 Self { project_root }
79 }
80
81 pub async fn analyze(&self) -> Result<Topology> {
83 let config_path = self.project_root.join("mecha10.json");
85 let config = ConfigService::load_from(&config_path).await?;
86
87 let redis = self.parse_redis_url(&config.redis.url)?;
89
90 let services = self.extract_services(&config);
92
93 let nodes = self.analyze_nodes(&config).await?;
95
96 let topics = self.build_topic_view(&nodes);
98
99 Ok(Topology {
100 project_name: config.name,
101 redis,
102 services,
103 nodes,
104 topics,
105 })
106 }
107
108 pub fn parse_redis_url(&self, url: &str) -> Result<RedisInfo> {
112 let url_pattern = Regex::new(r"redis://([^:]+):(\d+)")?;
114
115 if let Some(caps) = url_pattern.captures(url) {
116 let host = caps.get(1).map(|m| m.as_str()).unwrap_or("localhost");
117 let port = caps.get(2).and_then(|m| m.as_str().parse::<u16>().ok()).unwrap_or(6379);
118
119 Ok(RedisInfo {
120 url: url.to_string(),
121 host: host.to_string(),
122 port,
123 })
124 } else {
125 Ok(RedisInfo {
127 url: url.to_string(),
128 host: "localhost".to_string(),
129 port: 6379,
130 })
131 }
132 }
133
134 fn extract_services(&self, config: &ProjectConfig) -> Vec<ServiceInfo> {
136 let mut services = Vec::new();
137
138 if let Some(http_api) = &config.services.http_api {
140 services.push(ServiceInfo {
141 name: "HTTP API".to_string(),
142 host: http_api.host.clone(),
143 port: http_api.port,
144 });
145 }
146
147 if let Some(db) = &config.services.database {
149 if let Some(port) = self.extract_port_from_url(&db.url) {
151 services.push(ServiceInfo {
152 name: "Database".to_string(),
153 host: self
154 .extract_host_from_url(&db.url)
155 .unwrap_or_else(|| "localhost".to_string()),
156 port,
157 });
158 }
159 }
160
161 services
162 }
163
164 fn extract_host_from_url(&self, url: &str) -> Option<String> {
166 let re = Regex::new(r"://([^:/@]+)").ok()?;
167 re.captures(url)
168 .and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
169 }
170
171 fn extract_port_from_url(&self, url: &str) -> Option<u16> {
173 let re = Regex::new(r":(\d+)").ok()?;
174 re.captures(url)
175 .and_then(|caps| caps.get(1))
176 .and_then(|m| m.as_str().parse().ok())
177 }
178
179 async fn analyze_nodes(&self, config: &ProjectConfig) -> Result<Vec<NodeTopology>> {
181 let mut nodes = Vec::new();
182
183 for spec in config.nodes.get_node_specs() {
185 let topology = self.analyze_node(&spec.name, &spec.package_path(), None).await?;
186 nodes.push(topology);
187 }
188
189 Ok(nodes)
190 }
191
192 async fn analyze_node(&self, name: &str, package_path: &str, description: Option<&str>) -> Result<NodeTopology> {
194 let (publishes, subscribes) = self.load_topics_from_config(name).await.unwrap_or_else(|_| {
196 let source_path = match self.resolve_node_source(package_path) {
198 Ok(path) if path.exists() => path,
199 _ => return (Vec::new(), Vec::new()),
200 };
201
202 match tokio::task::block_in_place(|| {
204 let rt = tokio::runtime::Handle::current();
205 rt.block_on(self.parse_node_source(&source_path))
206 }) {
207 Ok((pubs, subs)) => (pubs, subs),
208 Err(_) => (Vec::new(), Vec::new()),
209 }
210 });
211
212 Ok(NodeTopology {
213 name: name.to_string(),
214 package: package_path.to_string(),
215 enabled: true,
216 description: description.map(String::from),
217 publishes,
218 subscribes,
219 })
220 }
221
222 async fn load_topics_from_config(&self, node_name: &str) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
262 let user_config_path = self
266 .project_root
267 .join("configs/common/nodes")
268 .join(node_name)
269 .join("config.json");
270
271 let framework_config_path = self
272 .project_root
273 .join("packages/nodes")
274 .join(node_name)
275 .join("configs/common/config.json");
276
277 let config_path = if user_config_path.exists() {
278 user_config_path
279 } else if framework_config_path.exists() {
280 framework_config_path
281 } else {
282 anyhow::bail!(
283 "Config file not found at {} or {}",
284 user_config_path.display(),
285 framework_config_path.display()
286 );
287 };
288
289 let content = tokio::fs::read_to_string(&config_path).await?;
291 let config: serde_json::Value = serde_json::from_str(&content)?;
292
293 Ok(self.parse_topics_from_json(&config))
295 }
296
297 fn resolve_node_source(&self, package_path: &str) -> Result<PathBuf> {
299 if package_path.starts_with("./") || package_path.starts_with("../") {
305 let path = self.project_root.join(package_path).join("src/lib.rs");
307 Ok(path)
308 } else if package_path.starts_with("@mecha10/") || package_path.starts_with("mecha10-") {
309 let package_name = package_path
311 .strip_prefix("@mecha10/")
312 .or_else(|| package_path.strip_prefix("mecha10-"))
313 .unwrap_or(package_path);
314
315 let package_name = package_name
317 .strip_prefix("nodes-")
318 .or_else(|| package_name.strip_prefix("drivers-"))
319 .unwrap_or(package_name);
320
321 let framework_root = self.find_framework_root()?;
323
324 let nodes_path = framework_root
326 .join("packages/nodes")
327 .join(package_name)
328 .join("src/lib.rs");
329 if nodes_path.exists() {
330 return Ok(nodes_path);
331 }
332
333 let drivers_path = framework_root
335 .join("packages/drivers")
336 .join(package_name)
337 .join("src/lib.rs");
338 if drivers_path.exists() {
339 return Ok(drivers_path);
340 }
341
342 let services_path = framework_root
344 .join("packages/services")
345 .join(package_name)
346 .join("src/lib.rs");
347 if services_path.exists() {
348 return Ok(services_path);
349 }
350
351 Ok(nodes_path)
353 } else {
354 Ok(self.project_root.join("nodes").join(package_path).join("src/lib.rs"))
356 }
357 }
358
359 fn find_framework_root(&self) -> Result<PathBuf> {
361 if let Ok(framework_path) = std::env::var("MECHA10_FRAMEWORK_PATH") {
363 let path = PathBuf::from(framework_path);
364 if path.exists() {
365 return Ok(path);
366 }
367 }
368
369 if self.project_root.join("packages/nodes").exists() {
371 return Ok(self.project_root.clone());
372 }
373
374 let mut current = self.project_root.clone();
376 loop {
377 if current.join("packages/nodes").exists() {
378 return Ok(current);
379 }
380
381 match current.parent() {
382 Some(parent) => current = parent.to_path_buf(),
383 None => break,
384 }
385 }
386
387 Ok(self.project_root.clone())
389 }
390
391 async fn parse_node_source(&self, source_path: &Path) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
393 let content = tokio::fs::read_to_string(source_path)
394 .await
395 .context(format!("Failed to read source file: {}", source_path.display()))?;
396
397 let topic_defs = self.extract_topic_definitions(&content);
399
400 let publishes = self.extract_publish_calls(&content, &topic_defs);
402
403 let subscribes = self.extract_subscribe_calls(&content, &topic_defs);
405
406 Ok((publishes, subscribes))
407 }
408
409 pub fn extract_topic_definitions(&self, content: &str) -> HashMap<String, TopicRef> {
413 let mut topics = HashMap::new();
414
415 let topic_pattern =
417 Regex::new(r#"pub\s+const\s+([A-Z_]+):\s*Topic<([^>]+)>\s*=\s*Topic::new\("([^"]+)"\)"#).unwrap();
418
419 for caps in topic_pattern.captures_iter(content) {
420 let const_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
421 let message_type = caps.get(2).map(|m| m.as_str().trim()).unwrap_or("");
422 let topic_path = caps.get(3).map(|m| m.as_str()).unwrap_or("");
423
424 topics.insert(
425 const_name.to_string(),
426 TopicRef {
427 path: topic_path.to_string(),
428 message_type: Some(message_type.to_string()),
429 },
430 );
431 }
432
433 topics
434 }
435
436 pub fn extract_publish_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
440 let mut publishes = Vec::new();
441 let mut seen = HashSet::new();
442
443 let publish_pattern = Regex::new(r"publish_to\s*\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*,").unwrap();
448
449 for caps in publish_pattern.captures_iter(content) {
450 if let Some(const_name) = caps.get(1) {
451 let const_name = const_name.as_str();
452 if let Some(topic) = topic_defs.get(const_name) {
453 if seen.insert(topic.path.clone()) {
454 publishes.push(topic.clone());
455 }
456 }
457 }
458 }
459
460 publishes
461 }
462
463 pub fn extract_subscribe_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
467 let mut subscribes = Vec::new();
468 let mut seen = HashSet::new();
469
470 let subscribe_pattern =
475 Regex::new(r"subscribe\s*(?:::\s*<[^>]+>\s*)?\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*\)").unwrap();
476
477 for caps in subscribe_pattern.captures_iter(content) {
478 if let Some(const_name) = caps.get(1) {
479 let const_name = const_name.as_str();
480 if let Some(topic) = topic_defs.get(const_name) {
481 if seen.insert(topic.path.clone()) {
482 subscribes.push(topic.clone());
483 }
484 }
485 }
486 }
487
488 subscribes
489 }
490
491 fn build_topic_view(&self, nodes: &[NodeTopology]) -> Vec<TopicTopology> {
493 let mut topic_map: HashMap<String, TopicTopology> = HashMap::new();
494
495 for node in nodes {
496 for topic_ref in &node.publishes {
498 let topic = topic_map
499 .entry(topic_ref.path.clone())
500 .or_insert_with(|| TopicTopology {
501 path: topic_ref.path.clone(),
502 message_type: topic_ref.message_type.clone(),
503 publishers: Vec::new(),
504 subscribers: Vec::new(),
505 });
506 if !topic.publishers.contains(&node.name) {
507 topic.publishers.push(node.name.clone());
508 }
509 }
510
511 for topic_ref in &node.subscribes {
513 let topic = topic_map
514 .entry(topic_ref.path.clone())
515 .or_insert_with(|| TopicTopology {
516 path: topic_ref.path.clone(),
517 message_type: topic_ref.message_type.clone(),
518 publishers: Vec::new(),
519 subscribers: Vec::new(),
520 });
521 if !topic.subscribers.contains(&node.name) {
522 topic.subscribers.push(node.name.clone());
523 }
524 }
525 }
526
527 let mut topics: Vec<TopicTopology> = topic_map.into_values().collect();
528 topics.sort_by(|a, b| a.path.cmp(&b.path));
529 topics
530 }
531
532 fn parse_topics_from_json(&self, config: &serde_json::Value) -> (Vec<TopicRef>, Vec<TopicRef>) {
537 let mut publishes = Vec::new();
538 let mut subscribes = Vec::new();
539
540 if let Some(topics) = config.get("topics").and_then(|v| v.as_object()) {
542 if topics.contains_key("publishes") || topics.contains_key("subscribes") {
544 if let Some(pubs) = topics.get("publishes").and_then(|v| v.as_array()) {
546 for topic in pubs {
547 if let Some(topic_obj) = topic.as_object() {
548 for (_semantic_name, topic_path) in topic_obj {
549 if let Some(path_str) = topic_path.as_str() {
550 publishes.push(TopicRef {
551 path: path_str.to_string(),
552 message_type: None,
553 });
554 }
555 }
556 }
557 }
558 }
559
560 if let Some(subs) = topics.get("subscribes").and_then(|v| v.as_array()) {
561 for topic in subs {
562 if let Some(topic_obj) = topic.as_object() {
563 for (_semantic_name, topic_path) in topic_obj {
564 if let Some(path_str) = topic_path.as_str() {
565 subscribes.push(TopicRef {
566 path: path_str.to_string(),
567 message_type: None,
568 });
569 }
570 }
571 }
572 }
573 }
574 } else {
575 for (field_name, topic_path) in topics {
577 if let Some(path_str) = topic_path.as_str() {
578 if field_name.ends_with("_in") || field_name.ends_with("_sub") || field_name.contains("input") {
580 subscribes.push(TopicRef {
581 path: path_str.to_string(),
582 message_type: None,
583 });
584 } else if field_name.ends_with("_out")
585 || field_name.ends_with("_pub")
586 || field_name.contains("output")
587 {
588 publishes.push(TopicRef {
589 path: path_str.to_string(),
590 message_type: None,
591 });
592 }
593 }
595 }
596 }
597 }
598
599 if publishes.is_empty() && subscribes.is_empty() {
601 if let Some(input) = config.get("input_topic").and_then(|v| v.as_str()) {
603 subscribes.push(TopicRef {
604 path: input.to_string(),
605 message_type: None,
606 });
607 }
608 if let Some(output) = config.get("output_topic").and_then(|v| v.as_str()) {
609 publishes.push(TopicRef {
610 path: output.to_string(),
611 message_type: None,
612 });
613 }
614 if let Some(control) = config.get("control_topic").and_then(|v| v.as_str()) {
615 subscribes.push(TopicRef {
616 path: control.to_string(),
617 message_type: None,
618 });
619 }
620 }
621
622 (publishes, subscribes)
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629 use std::path::PathBuf;
630
631 fn create_test_service() -> TopologyService {
632 TopologyService::new(PathBuf::from("/tmp/test"))
633 }
634
635 #[test]
636 fn test_parse_array_format() {
637 let service = create_test_service();
638 let config: serde_json::Value = serde_json::json!({
639 "topics": {
640 "publishes": [
641 { "output": "/vision/classification" },
642 { "status": "/motor/status" }
643 ],
644 "subscribes": [
645 { "input": "/camera/rgb" }
646 ]
647 }
648 });
649
650 let (publishes, subscribes) = service.parse_topics_from_json(&config);
651
652 assert_eq!(publishes.len(), 2);
653 assert_eq!(subscribes.len(), 1);
654 assert_eq!(publishes[0].path, "/vision/classification");
655 assert_eq!(publishes[1].path, "/motor/status");
656 assert_eq!(subscribes[0].path, "/camera/rgb");
657 }
658
659 #[test]
660 fn test_parse_flat_format() {
661 let service = create_test_service();
662 let config: serde_json::Value = serde_json::json!({
663 "topics": {
664 "command_in": "/ai/command",
665 "response_out": "/ai/response",
666 "camera_in": "/camera/rgb",
667 "nav_goal_out": "/nav/goal",
668 "motor_cmd_out": "/motor/cmd_vel",
669 "behavior_out": "/behavior/execute"
670 }
671 });
672
673 let (publishes, subscribes) = service.parse_topics_from_json(&config);
674
675 assert_eq!(publishes.len(), 4);
677 assert!(publishes.iter().any(|t| t.path == "/ai/response"));
678 assert!(publishes.iter().any(|t| t.path == "/nav/goal"));
679 assert!(publishes.iter().any(|t| t.path == "/motor/cmd_vel"));
680 assert!(publishes.iter().any(|t| t.path == "/behavior/execute"));
681
682 assert_eq!(subscribes.len(), 2);
684 assert!(subscribes.iter().any(|t| t.path == "/ai/command"));
685 assert!(subscribes.iter().any(|t| t.path == "/camera/rgb"));
686 }
687
688 #[test]
689 fn test_parse_root_level_format() {
690 let service = create_test_service();
691 let config: serde_json::Value = serde_json::json!({
692 "input_topic": "/robot/sensors/camera/rgb",
693 "output_topic": "/inference/detections",
694 "control_topic": "/inference/cmd"
695 });
696
697 let (publishes, subscribes) = service.parse_topics_from_json(&config);
698
699 assert_eq!(publishes.len(), 1);
700 assert_eq!(subscribes.len(), 2);
701 assert_eq!(publishes[0].path, "/inference/detections");
702 assert!(subscribes.iter().any(|t| t.path == "/robot/sensors/camera/rgb"));
703 assert!(subscribes.iter().any(|t| t.path == "/inference/cmd"));
704 }
705}