1use anyhow::{Context, Result};
9use regex::Regex;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::path::{Path, PathBuf};
13
14use crate::paths;
15use crate::services::ConfigService;
16use crate::types::ProjectConfig;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct Topology {
21 pub project_name: String,
22 pub redis: RedisInfo,
23 pub services: Vec<ServiceInfo>,
24 pub nodes: Vec<NodeTopology>,
25 pub topics: Vec<TopicTopology>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct RedisInfo {
31 pub url: String,
32 pub host: String,
33 pub port: u16,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ServiceInfo {
39 pub name: String,
40 pub host: String,
41 pub port: u16,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeTopology {
47 pub name: String,
48 pub package: String,
49 pub enabled: bool,
50 pub description: Option<String>,
51 pub publishes: Vec<TopicRef>,
52 pub subscribes: Vec<TopicRef>,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
57pub struct TopicRef {
58 pub path: String,
59 pub message_type: Option<String>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct TopicTopology {
65 pub path: String,
66 pub message_type: Option<String>,
67 pub publishers: Vec<String>,
68 pub subscribers: Vec<String>,
69}
70
71pub struct TopologyService {
73 project_root: PathBuf,
74}
75
76impl TopologyService {
77 pub fn new(project_root: PathBuf) -> Self {
79 Self { project_root }
80 }
81
82 pub async fn analyze(&self) -> Result<Topology> {
84 let config_path = self.project_root.join(paths::PROJECT_CONFIG);
86 let config = ConfigService::load_from(&config_path).await?;
87
88 let redis = self.parse_redis_url(&config.environments.redis_url())?;
90
91 let services = self.extract_services(&config);
93
94 let nodes = self.analyze_nodes(&config).await?;
96
97 let topics = self.build_topic_view(&nodes);
99
100 Ok(Topology {
101 project_name: config.name,
102 redis,
103 services,
104 nodes,
105 topics,
106 })
107 }
108
109 pub fn parse_redis_url(&self, url: &str) -> Result<RedisInfo> {
113 let url_pattern = Regex::new(r"redis://([^:]+):(\d+)")?;
115
116 if let Some(caps) = url_pattern.captures(url) {
117 let host = caps.get(1).map(|m| m.as_str()).unwrap_or("localhost");
118 let port = caps.get(2).and_then(|m| m.as_str().parse::<u16>().ok()).unwrap_or(6379);
119
120 Ok(RedisInfo {
121 url: url.to_string(),
122 host: host.to_string(),
123 port,
124 })
125 } else {
126 Ok(RedisInfo {
128 url: url.to_string(),
129 host: "localhost".to_string(),
130 port: 6379,
131 })
132 }
133 }
134
135 fn extract_services(&self, config: &ProjectConfig) -> Vec<ServiceInfo> {
137 let mut services = Vec::new();
138
139 if let Some(http_api) = &config.services.http_api {
141 services.push(ServiceInfo {
142 name: "HTTP API".to_string(),
143 host: http_api.host.clone(),
144 port: http_api.port,
145 });
146 }
147
148 if let Some(db) = &config.services.database {
150 if let Some(port) = self.extract_port_from_url(&db.url) {
152 services.push(ServiceInfo {
153 name: "Database".to_string(),
154 host: self
155 .extract_host_from_url(&db.url)
156 .unwrap_or_else(|| "localhost".to_string()),
157 port,
158 });
159 }
160 }
161
162 services
163 }
164
165 fn extract_host_from_url(&self, url: &str) -> Option<String> {
167 let re = Regex::new(r"://([^:/@]+)").ok()?;
168 re.captures(url)
169 .and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
170 }
171
172 fn extract_port_from_url(&self, url: &str) -> Option<u16> {
174 let re = Regex::new(r":(\d+)").ok()?;
175 re.captures(url)
176 .and_then(|caps| caps.get(1))
177 .and_then(|m| m.as_str().parse().ok())
178 }
179
180 async fn analyze_nodes(&self, config: &ProjectConfig) -> Result<Vec<NodeTopology>> {
182 let mut nodes = Vec::new();
183
184 for spec in config.nodes.get_node_specs() {
186 let topology = self.analyze_node(&spec.name, &spec.package_path(), None).await?;
187 nodes.push(topology);
188 }
189
190 Ok(nodes)
191 }
192
193 async fn analyze_node(&self, name: &str, package_path: &str, description: Option<&str>) -> Result<NodeTopology> {
195 let (publishes, subscribes) = self.load_topics_from_config(name).await.unwrap_or_else(|_| {
197 let source_path = match self.resolve_node_source(package_path) {
199 Ok(path) if path.exists() => path,
200 _ => return (Vec::new(), Vec::new()),
201 };
202
203 match tokio::task::block_in_place(|| {
205 let rt = tokio::runtime::Handle::current();
206 rt.block_on(self.parse_node_source(&source_path))
207 }) {
208 Ok((pubs, subs)) => (pubs, subs),
209 Err(_) => (Vec::new(), Vec::new()),
210 }
211 });
212
213 Ok(NodeTopology {
214 name: name.to_string(),
215 package: package_path.to_string(),
216 enabled: true,
217 description: description.map(String::from),
218 publishes,
219 subscribes,
220 })
221 }
222
223 async fn load_topics_from_config(&self, node_name: &str) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
263 let mecha10_config_path = self
268 .project_root
269 .join("configs/nodes/@mecha10")
270 .join(node_name)
271 .join("config.json");
272
273 let local_config_path = self
274 .project_root
275 .join("configs/nodes/@local")
276 .join(node_name)
277 .join("config.json");
278
279 let framework_config_path = self
280 .project_root
281 .join(paths::framework::NODES_DIR)
282 .join(node_name)
283 .join("configs/config.json");
284
285 let config_path = if mecha10_config_path.exists() {
286 mecha10_config_path
287 } else if local_config_path.exists() {
288 local_config_path
289 } else if framework_config_path.exists() {
290 framework_config_path
291 } else {
292 anyhow::bail!(
293 "Config file not found at {}, {}, or {}",
294 mecha10_config_path.display(),
295 local_config_path.display(),
296 framework_config_path.display()
297 );
298 };
299
300 let content = tokio::fs::read_to_string(&config_path).await?;
302 let config: serde_json::Value = serde_json::from_str(&content)?;
303
304 let profile = std::env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "dev".to_string());
306 let env_config = if config.get("dev").is_some() || config.get("production").is_some() {
307 let section = match profile.as_str() {
309 "production" | "prod" => config.get("production"),
310 _ => config.get("dev"),
311 };
312 section.cloned().unwrap_or(config)
313 } else {
314 config
316 };
317
318 Ok(self.parse_topics_from_json(&env_config))
320 }
321
322 fn resolve_node_source(&self, package_path: &str) -> Result<PathBuf> {
324 if package_path.starts_with("./") || package_path.starts_with("../") {
330 let path = self.project_root.join(package_path).join("src/lib.rs");
332 Ok(path)
333 } else if package_path.starts_with("@mecha10/") || package_path.starts_with("mecha10-") {
334 let package_name = package_path
336 .strip_prefix("@mecha10/")
337 .or_else(|| package_path.strip_prefix("mecha10-"))
338 .unwrap_or(package_path);
339
340 let package_name = package_name
342 .strip_prefix("nodes-")
343 .or_else(|| package_name.strip_prefix("drivers-"))
344 .unwrap_or(package_name);
345
346 let framework_root = self.find_framework_root()?;
348
349 let nodes_path = framework_root
351 .join(paths::framework::NODES_DIR)
352 .join(package_name)
353 .join("src/lib.rs");
354 if nodes_path.exists() {
355 return Ok(nodes_path);
356 }
357
358 let drivers_path = framework_root
360 .join(paths::framework::DRIVERS_DIR)
361 .join(package_name)
362 .join("src/lib.rs");
363 if drivers_path.exists() {
364 return Ok(drivers_path);
365 }
366
367 let services_path = framework_root
369 .join(paths::framework::SERVICES_DIR)
370 .join(package_name)
371 .join("src/lib.rs");
372 if services_path.exists() {
373 return Ok(services_path);
374 }
375
376 Ok(nodes_path)
378 } else {
379 Ok(self
381 .project_root
382 .join(paths::project::NODES_DIR)
383 .join(package_path)
384 .join("src/lib.rs"))
385 }
386 }
387
388 fn find_framework_root(&self) -> Result<PathBuf> {
390 if let Ok(framework_path) = std::env::var("MECHA10_FRAMEWORK_PATH") {
392 let path = PathBuf::from(framework_path);
393 if path.exists() {
394 return Ok(path);
395 }
396 }
397
398 if self.project_root.join(paths::framework::NODES_DIR).exists() {
400 return Ok(self.project_root.clone());
401 }
402
403 let mut current = self.project_root.clone();
405 loop {
406 if current.join(paths::framework::NODES_DIR).exists() {
407 return Ok(current);
408 }
409
410 match current.parent() {
411 Some(parent) => current = parent.to_path_buf(),
412 None => break,
413 }
414 }
415
416 Ok(self.project_root.clone())
418 }
419
420 async fn parse_node_source(&self, source_path: &Path) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
422 let content = tokio::fs::read_to_string(source_path)
423 .await
424 .context(format!("Failed to read source file: {}", source_path.display()))?;
425
426 let topic_defs = self.extract_topic_definitions(&content);
428
429 let publishes = self.extract_publish_calls(&content, &topic_defs);
431
432 let subscribes = self.extract_subscribe_calls(&content, &topic_defs);
434
435 Ok((publishes, subscribes))
436 }
437
438 pub fn extract_topic_definitions(&self, content: &str) -> HashMap<String, TopicRef> {
442 let mut topics = HashMap::new();
443
444 let topic_pattern =
446 Regex::new(r#"pub\s+const\s+([A-Z_]+):\s*Topic<([^>]+)>\s*=\s*Topic::new\("([^"]+)"\)"#).unwrap();
447
448 for caps in topic_pattern.captures_iter(content) {
449 let const_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
450 let message_type = caps.get(2).map(|m| m.as_str().trim()).unwrap_or("");
451 let topic_path = caps.get(3).map(|m| m.as_str()).unwrap_or("");
452
453 topics.insert(
454 const_name.to_string(),
455 TopicRef {
456 path: topic_path.to_string(),
457 message_type: Some(message_type.to_string()),
458 },
459 );
460 }
461
462 topics
463 }
464
465 pub fn extract_publish_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
469 let mut publishes = Vec::new();
470 let mut seen = HashSet::new();
471
472 let publish_pattern = Regex::new(r"publish_to\s*\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*,").unwrap();
477
478 for caps in publish_pattern.captures_iter(content) {
479 if let Some(const_name) = caps.get(1) {
480 let const_name = const_name.as_str();
481 if let Some(topic) = topic_defs.get(const_name) {
482 if seen.insert(topic.path.clone()) {
483 publishes.push(topic.clone());
484 }
485 }
486 }
487 }
488
489 publishes
490 }
491
492 pub fn extract_subscribe_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
496 let mut subscribes = Vec::new();
497 let mut seen = HashSet::new();
498
499 let subscribe_pattern =
504 Regex::new(r"subscribe\s*(?:::\s*<[^>]+>\s*)?\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*\)").unwrap();
505
506 for caps in subscribe_pattern.captures_iter(content) {
507 if let Some(const_name) = caps.get(1) {
508 let const_name = const_name.as_str();
509 if let Some(topic) = topic_defs.get(const_name) {
510 if seen.insert(topic.path.clone()) {
511 subscribes.push(topic.clone());
512 }
513 }
514 }
515 }
516
517 subscribes
518 }
519
520 fn build_topic_view(&self, nodes: &[NodeTopology]) -> Vec<TopicTopology> {
522 let mut topic_map: HashMap<String, TopicTopology> = HashMap::new();
523
524 for node in nodes {
525 for topic_ref in &node.publishes {
527 let topic = topic_map
528 .entry(topic_ref.path.clone())
529 .or_insert_with(|| TopicTopology {
530 path: topic_ref.path.clone(),
531 message_type: topic_ref.message_type.clone(),
532 publishers: Vec::new(),
533 subscribers: Vec::new(),
534 });
535 if !topic.publishers.contains(&node.name) {
536 topic.publishers.push(node.name.clone());
537 }
538 }
539
540 for topic_ref in &node.subscribes {
542 let topic = topic_map
543 .entry(topic_ref.path.clone())
544 .or_insert_with(|| TopicTopology {
545 path: topic_ref.path.clone(),
546 message_type: topic_ref.message_type.clone(),
547 publishers: Vec::new(),
548 subscribers: Vec::new(),
549 });
550 if !topic.subscribers.contains(&node.name) {
551 topic.subscribers.push(node.name.clone());
552 }
553 }
554 }
555
556 let mut topics: Vec<TopicTopology> = topic_map.into_values().collect();
557 topics.sort_by(|a, b| a.path.cmp(&b.path));
558 topics
559 }
560
561 fn parse_topics_from_json(&self, config: &serde_json::Value) -> (Vec<TopicRef>, Vec<TopicRef>) {
566 let mut publishes = Vec::new();
567 let mut subscribes = Vec::new();
568
569 if let Some(topics) = config.get("topics").and_then(|v| v.as_object()) {
571 if topics.contains_key("publishes") || topics.contains_key("subscribes") {
573 if let Some(pubs) = topics.get("publishes").and_then(|v| v.as_array()) {
575 for topic in pubs {
576 if let Some(topic_obj) = topic.as_object() {
577 for (_semantic_name, topic_path) in topic_obj {
578 if let Some(path_str) = topic_path.as_str() {
579 publishes.push(TopicRef {
580 path: path_str.to_string(),
581 message_type: None,
582 });
583 }
584 }
585 }
586 }
587 }
588
589 if let Some(subs) = topics.get("subscribes").and_then(|v| v.as_array()) {
590 for topic in subs {
591 if let Some(topic_obj) = topic.as_object() {
592 for (_semantic_name, topic_path) in topic_obj {
593 if let Some(path_str) = topic_path.as_str() {
594 subscribes.push(TopicRef {
595 path: path_str.to_string(),
596 message_type: None,
597 });
598 }
599 }
600 }
601 }
602 }
603 } else {
604 for (field_name, topic_path) in topics {
606 if let Some(path_str) = topic_path.as_str() {
607 if field_name.ends_with("_in") || field_name.ends_with("_sub") || field_name.contains("input") {
609 subscribes.push(TopicRef {
610 path: path_str.to_string(),
611 message_type: None,
612 });
613 } else if field_name.ends_with("_out")
614 || field_name.ends_with("_pub")
615 || field_name.contains("output")
616 {
617 publishes.push(TopicRef {
618 path: path_str.to_string(),
619 message_type: None,
620 });
621 }
622 }
624 }
625 }
626 }
627
628 if publishes.is_empty() && subscribes.is_empty() {
630 if let Some(input) = config.get("input_topic").and_then(|v| v.as_str()) {
632 subscribes.push(TopicRef {
633 path: input.to_string(),
634 message_type: None,
635 });
636 }
637 if let Some(output) = config.get("output_topic").and_then(|v| v.as_str()) {
638 publishes.push(TopicRef {
639 path: output.to_string(),
640 message_type: None,
641 });
642 }
643 if let Some(control) = config.get("control_topic").and_then(|v| v.as_str()) {
644 subscribes.push(TopicRef {
645 path: control.to_string(),
646 message_type: None,
647 });
648 }
649 }
650
651 (publishes, subscribes)
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658 use std::path::PathBuf;
659
660 fn create_test_service() -> TopologyService {
661 TopologyService::new(PathBuf::from("/tmp/test"))
662 }
663
664 #[test]
665 fn test_parse_array_format() {
666 let service = create_test_service();
667 let config: serde_json::Value = serde_json::json!({
668 "topics": {
669 "publishes": [
670 { "output": "/vision/classification" },
671 { "status": "/motor/status" }
672 ],
673 "subscribes": [
674 { "input": "/camera/rgb" }
675 ]
676 }
677 });
678
679 let (publishes, subscribes) = service.parse_topics_from_json(&config);
680
681 assert_eq!(publishes.len(), 2);
682 assert_eq!(subscribes.len(), 1);
683 assert_eq!(publishes[0].path, "/vision/classification");
684 assert_eq!(publishes[1].path, "/motor/status");
685 assert_eq!(subscribes[0].path, "/camera/rgb");
686 }
687
688 #[test]
689 fn test_parse_flat_format() {
690 let service = create_test_service();
691 let config: serde_json::Value = serde_json::json!({
692 "topics": {
693 "command_in": "/ai/command",
694 "response_out": "/ai/response",
695 "camera_in": "/camera/rgb",
696 "nav_goal_out": "/nav/goal",
697 "motor_cmd_out": "/motor/cmd_vel",
698 "behavior_out": "/behavior/execute"
699 }
700 });
701
702 let (publishes, subscribes) = service.parse_topics_from_json(&config);
703
704 assert_eq!(publishes.len(), 4);
706 assert!(publishes.iter().any(|t| t.path == "/ai/response"));
707 assert!(publishes.iter().any(|t| t.path == "/nav/goal"));
708 assert!(publishes.iter().any(|t| t.path == "/motor/cmd_vel"));
709 assert!(publishes.iter().any(|t| t.path == "/behavior/execute"));
710
711 assert_eq!(subscribes.len(), 2);
713 assert!(subscribes.iter().any(|t| t.path == "/ai/command"));
714 assert!(subscribes.iter().any(|t| t.path == "/camera/rgb"));
715 }
716
717 #[test]
718 fn test_parse_root_level_format() {
719 let service = create_test_service();
720 let config: serde_json::Value = serde_json::json!({
721 "input_topic": "/robot/sensors/camera/rgb",
722 "output_topic": "/inference/detections",
723 "control_topic": "/inference/cmd"
724 });
725
726 let (publishes, subscribes) = service.parse_topics_from_json(&config);
727
728 assert_eq!(publishes.len(), 1);
729 assert_eq!(subscribes.len(), 2);
730 assert_eq!(publishes[0].path, "/inference/detections");
731 assert!(subscribes.iter().any(|t| t.path == "/robot/sensors/camera/rgb"));
732 assert!(subscribes.iter().any(|t| t.path == "/inference/cmd"));
733 }
734}