1use anyhow::{Context, Result};
9use regex::Regex;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::path::{Path, PathBuf};
13
14use crate::services::ConfigService;
15use crate::types::ProjectConfig;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct Topology {
20 pub project_name: String,
21 pub redis: RedisInfo,
22 pub services: Vec<ServiceInfo>,
23 pub nodes: Vec<NodeTopology>,
24 pub topics: Vec<TopicTopology>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct RedisInfo {
30 pub url: String,
31 pub host: String,
32 pub port: u16,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct ServiceInfo {
38 pub name: String,
39 pub host: String,
40 pub port: u16,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct NodeTopology {
46 pub name: String,
47 pub package: String,
48 pub enabled: bool,
49 pub description: Option<String>,
50 pub publishes: Vec<TopicRef>,
51 pub subscribes: Vec<TopicRef>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
56pub struct TopicRef {
57 pub path: String,
58 pub message_type: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TopicTopology {
64 pub path: String,
65 pub message_type: Option<String>,
66 pub publishers: Vec<String>,
67 pub subscribers: Vec<String>,
68}
69
70pub struct TopologyService {
72 project_root: PathBuf,
73}
74
75impl TopologyService {
76 pub fn new(project_root: PathBuf) -> Self {
78 Self { project_root }
79 }
80
81 pub async fn analyze(&self) -> Result<Topology> {
83 let config_path = self.project_root.join("mecha10.json");
85 let config = ConfigService::load_from(&config_path).await?;
86
87 let redis = self.parse_redis_url(&config.redis.url)?;
89
90 let services = self.extract_services(&config);
92
93 let nodes = self.analyze_nodes(&config).await?;
95
96 let topics = self.build_topic_view(&nodes);
98
99 Ok(Topology {
100 project_name: config.name,
101 redis,
102 services,
103 nodes,
104 topics,
105 })
106 }
107
108 pub fn parse_redis_url(&self, url: &str) -> Result<RedisInfo> {
112 let url_pattern = Regex::new(r"redis://([^:]+):(\d+)")?;
114
115 if let Some(caps) = url_pattern.captures(url) {
116 let host = caps.get(1).map(|m| m.as_str()).unwrap_or("localhost");
117 let port = caps.get(2).and_then(|m| m.as_str().parse::<u16>().ok()).unwrap_or(6379);
118
119 Ok(RedisInfo {
120 url: url.to_string(),
121 host: host.to_string(),
122 port,
123 })
124 } else {
125 Ok(RedisInfo {
127 url: url.to_string(),
128 host: "localhost".to_string(),
129 port: 6379,
130 })
131 }
132 }
133
134 fn extract_services(&self, config: &ProjectConfig) -> Vec<ServiceInfo> {
136 let mut services = Vec::new();
137
138 if let Some(http_api) = &config.services.http_api {
140 services.push(ServiceInfo {
141 name: "HTTP API".to_string(),
142 host: http_api.host.clone(),
143 port: http_api.port,
144 });
145 }
146
147 if let Some(db) = &config.services.database {
149 if let Some(port) = self.extract_port_from_url(&db.url) {
151 services.push(ServiceInfo {
152 name: "Database".to_string(),
153 host: self
154 .extract_host_from_url(&db.url)
155 .unwrap_or_else(|| "localhost".to_string()),
156 port,
157 });
158 }
159 }
160
161 services
162 }
163
164 fn extract_host_from_url(&self, url: &str) -> Option<String> {
166 let re = Regex::new(r"://([^:/@]+)").ok()?;
167 re.captures(url)
168 .and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
169 }
170
171 fn extract_port_from_url(&self, url: &str) -> Option<u16> {
173 let re = Regex::new(r":(\d+)").ok()?;
174 re.captures(url)
175 .and_then(|caps| caps.get(1))
176 .and_then(|m| m.as_str().parse().ok())
177 }
178
179 async fn analyze_nodes(&self, config: &ProjectConfig) -> Result<Vec<NodeTopology>> {
181 let mut nodes = Vec::new();
182
183 for node in &config.nodes.drivers {
185 if node.enabled {
186 let topology = self
187 .analyze_node(&node.name, &node.path, node.description.as_deref())
188 .await?;
189 nodes.push(topology);
190 }
191 }
192
193 for node in &config.nodes.custom {
195 if node.enabled {
196 let topology = self
197 .analyze_node(&node.name, &node.path, node.description.as_deref())
198 .await?;
199 nodes.push(topology);
200 }
201 }
202
203 Ok(nodes)
204 }
205
206 async fn analyze_node(&self, name: &str, package_path: &str, description: Option<&str>) -> Result<NodeTopology> {
208 let (publishes, subscribes) = self.load_topics_from_config(name).await.unwrap_or_else(|_| {
210 let source_path = match self.resolve_node_source(package_path) {
212 Ok(path) if path.exists() => path,
213 _ => return (Vec::new(), Vec::new()),
214 };
215
216 match tokio::task::block_in_place(|| {
218 let rt = tokio::runtime::Handle::current();
219 rt.block_on(self.parse_node_source(&source_path))
220 }) {
221 Ok((pubs, subs)) => (pubs, subs),
222 Err(_) => (Vec::new(), Vec::new()),
223 }
224 });
225
226 Ok(NodeTopology {
227 name: name.to_string(),
228 package: package_path.to_string(),
229 enabled: true,
230 description: description.map(String::from),
231 publishes,
232 subscribes,
233 })
234 }
235
236 async fn load_topics_from_config(&self, node_name: &str) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
276 let user_config_path = self
280 .project_root
281 .join("configs/common/nodes")
282 .join(node_name)
283 .join("config.json");
284
285 let framework_config_path = self
286 .project_root
287 .join("packages/nodes")
288 .join(node_name)
289 .join("configs/common/config.json");
290
291 let config_path = if user_config_path.exists() {
292 user_config_path
293 } else if framework_config_path.exists() {
294 framework_config_path
295 } else {
296 anyhow::bail!(
297 "Config file not found at {} or {}",
298 user_config_path.display(),
299 framework_config_path.display()
300 );
301 };
302
303 let content = tokio::fs::read_to_string(&config_path).await?;
305 let config: serde_json::Value = serde_json::from_str(&content)?;
306
307 Ok(self.parse_topics_from_json(&config))
309 }
310
311 fn resolve_node_source(&self, package_path: &str) -> Result<PathBuf> {
313 if package_path.starts_with("./") || package_path.starts_with("../") {
319 let path = self.project_root.join(package_path).join("src/lib.rs");
321 Ok(path)
322 } else if package_path.starts_with("@mecha10/") || package_path.starts_with("mecha10-") {
323 let package_name = package_path
325 .strip_prefix("@mecha10/")
326 .or_else(|| package_path.strip_prefix("mecha10-"))
327 .unwrap_or(package_path);
328
329 let package_name = package_name
331 .strip_prefix("nodes-")
332 .or_else(|| package_name.strip_prefix("drivers-"))
333 .unwrap_or(package_name);
334
335 let framework_root = self.find_framework_root()?;
337
338 let nodes_path = framework_root
340 .join("packages/nodes")
341 .join(package_name)
342 .join("src/lib.rs");
343 if nodes_path.exists() {
344 return Ok(nodes_path);
345 }
346
347 let drivers_path = framework_root
349 .join("packages/drivers")
350 .join(package_name)
351 .join("src/lib.rs");
352 if drivers_path.exists() {
353 return Ok(drivers_path);
354 }
355
356 let services_path = framework_root
358 .join("packages/services")
359 .join(package_name)
360 .join("src/lib.rs");
361 if services_path.exists() {
362 return Ok(services_path);
363 }
364
365 Ok(nodes_path)
367 } else {
368 Ok(self.project_root.join("nodes").join(package_path).join("src/lib.rs"))
370 }
371 }
372
373 fn find_framework_root(&self) -> Result<PathBuf> {
375 if let Ok(framework_path) = std::env::var("MECHA10_FRAMEWORK_PATH") {
377 let path = PathBuf::from(framework_path);
378 if path.exists() {
379 return Ok(path);
380 }
381 }
382
383 if self.project_root.join("packages/nodes").exists() {
385 return Ok(self.project_root.clone());
386 }
387
388 let mut current = self.project_root.clone();
390 loop {
391 if current.join("packages/nodes").exists() {
392 return Ok(current);
393 }
394
395 match current.parent() {
396 Some(parent) => current = parent.to_path_buf(),
397 None => break,
398 }
399 }
400
401 Ok(self.project_root.clone())
403 }
404
405 async fn parse_node_source(&self, source_path: &Path) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
407 let content = tokio::fs::read_to_string(source_path)
408 .await
409 .context(format!("Failed to read source file: {}", source_path.display()))?;
410
411 let topic_defs = self.extract_topic_definitions(&content);
413
414 let publishes = self.extract_publish_calls(&content, &topic_defs);
416
417 let subscribes = self.extract_subscribe_calls(&content, &topic_defs);
419
420 Ok((publishes, subscribes))
421 }
422
423 pub fn extract_topic_definitions(&self, content: &str) -> HashMap<String, TopicRef> {
427 let mut topics = HashMap::new();
428
429 let topic_pattern =
431 Regex::new(r#"pub\s+const\s+([A-Z_]+):\s*Topic<([^>]+)>\s*=\s*Topic::new\("([^"]+)"\)"#).unwrap();
432
433 for caps in topic_pattern.captures_iter(content) {
434 let const_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
435 let message_type = caps.get(2).map(|m| m.as_str().trim()).unwrap_or("");
436 let topic_path = caps.get(3).map(|m| m.as_str()).unwrap_or("");
437
438 topics.insert(
439 const_name.to_string(),
440 TopicRef {
441 path: topic_path.to_string(),
442 message_type: Some(message_type.to_string()),
443 },
444 );
445 }
446
447 topics
448 }
449
450 pub fn extract_publish_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
454 let mut publishes = Vec::new();
455 let mut seen = HashSet::new();
456
457 let publish_pattern = Regex::new(r"publish_to\s*\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*,").unwrap();
462
463 for caps in publish_pattern.captures_iter(content) {
464 if let Some(const_name) = caps.get(1) {
465 let const_name = const_name.as_str();
466 if let Some(topic) = topic_defs.get(const_name) {
467 if seen.insert(topic.path.clone()) {
468 publishes.push(topic.clone());
469 }
470 }
471 }
472 }
473
474 publishes
475 }
476
477 pub fn extract_subscribe_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
481 let mut subscribes = Vec::new();
482 let mut seen = HashSet::new();
483
484 let subscribe_pattern =
489 Regex::new(r"subscribe\s*(?:::\s*<[^>]+>\s*)?\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*\)").unwrap();
490
491 for caps in subscribe_pattern.captures_iter(content) {
492 if let Some(const_name) = caps.get(1) {
493 let const_name = const_name.as_str();
494 if let Some(topic) = topic_defs.get(const_name) {
495 if seen.insert(topic.path.clone()) {
496 subscribes.push(topic.clone());
497 }
498 }
499 }
500 }
501
502 subscribes
503 }
504
505 fn build_topic_view(&self, nodes: &[NodeTopology]) -> Vec<TopicTopology> {
507 let mut topic_map: HashMap<String, TopicTopology> = HashMap::new();
508
509 for node in nodes {
510 for topic_ref in &node.publishes {
512 let topic = topic_map
513 .entry(topic_ref.path.clone())
514 .or_insert_with(|| TopicTopology {
515 path: topic_ref.path.clone(),
516 message_type: topic_ref.message_type.clone(),
517 publishers: Vec::new(),
518 subscribers: Vec::new(),
519 });
520 if !topic.publishers.contains(&node.name) {
521 topic.publishers.push(node.name.clone());
522 }
523 }
524
525 for topic_ref in &node.subscribes {
527 let topic = topic_map
528 .entry(topic_ref.path.clone())
529 .or_insert_with(|| TopicTopology {
530 path: topic_ref.path.clone(),
531 message_type: topic_ref.message_type.clone(),
532 publishers: Vec::new(),
533 subscribers: Vec::new(),
534 });
535 if !topic.subscribers.contains(&node.name) {
536 topic.subscribers.push(node.name.clone());
537 }
538 }
539 }
540
541 let mut topics: Vec<TopicTopology> = topic_map.into_values().collect();
542 topics.sort_by(|a, b| a.path.cmp(&b.path));
543 topics
544 }
545
546 fn parse_topics_from_json(&self, config: &serde_json::Value) -> (Vec<TopicRef>, Vec<TopicRef>) {
551 let mut publishes = Vec::new();
552 let mut subscribes = Vec::new();
553
554 if let Some(topics) = config.get("topics").and_then(|v| v.as_object()) {
556 if topics.contains_key("publishes") || topics.contains_key("subscribes") {
558 if let Some(pubs) = topics.get("publishes").and_then(|v| v.as_array()) {
560 for topic in pubs {
561 if let Some(topic_obj) = topic.as_object() {
562 for (_semantic_name, topic_path) in topic_obj {
563 if let Some(path_str) = topic_path.as_str() {
564 publishes.push(TopicRef {
565 path: path_str.to_string(),
566 message_type: None,
567 });
568 }
569 }
570 }
571 }
572 }
573
574 if let Some(subs) = topics.get("subscribes").and_then(|v| v.as_array()) {
575 for topic in subs {
576 if let Some(topic_obj) = topic.as_object() {
577 for (_semantic_name, topic_path) in topic_obj {
578 if let Some(path_str) = topic_path.as_str() {
579 subscribes.push(TopicRef {
580 path: path_str.to_string(),
581 message_type: None,
582 });
583 }
584 }
585 }
586 }
587 }
588 } else {
589 for (field_name, topic_path) in topics {
591 if let Some(path_str) = topic_path.as_str() {
592 if field_name.ends_with("_in") || field_name.ends_with("_sub") || field_name.contains("input") {
594 subscribes.push(TopicRef {
595 path: path_str.to_string(),
596 message_type: None,
597 });
598 } else if field_name.ends_with("_out")
599 || field_name.ends_with("_pub")
600 || field_name.contains("output")
601 {
602 publishes.push(TopicRef {
603 path: path_str.to_string(),
604 message_type: None,
605 });
606 }
607 }
609 }
610 }
611 }
612
613 if publishes.is_empty() && subscribes.is_empty() {
615 if let Some(input) = config.get("input_topic").and_then(|v| v.as_str()) {
617 subscribes.push(TopicRef {
618 path: input.to_string(),
619 message_type: None,
620 });
621 }
622 if let Some(output) = config.get("output_topic").and_then(|v| v.as_str()) {
623 publishes.push(TopicRef {
624 path: output.to_string(),
625 message_type: None,
626 });
627 }
628 if let Some(control) = config.get("control_topic").and_then(|v| v.as_str()) {
629 subscribes.push(TopicRef {
630 path: control.to_string(),
631 message_type: None,
632 });
633 }
634 }
635
636 (publishes, subscribes)
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643 use std::path::PathBuf;
644
645 fn create_test_service() -> TopologyService {
646 TopologyService::new(PathBuf::from("/tmp/test"))
647 }
648
649 #[test]
650 fn test_parse_array_format() {
651 let service = create_test_service();
652 let config: serde_json::Value = serde_json::json!({
653 "topics": {
654 "publishes": [
655 { "output": "/vision/classification" },
656 { "status": "/motor/status" }
657 ],
658 "subscribes": [
659 { "input": "/camera/rgb" }
660 ]
661 }
662 });
663
664 let (publishes, subscribes) = service.parse_topics_from_json(&config);
665
666 assert_eq!(publishes.len(), 2);
667 assert_eq!(subscribes.len(), 1);
668 assert_eq!(publishes[0].path, "/vision/classification");
669 assert_eq!(publishes[1].path, "/motor/status");
670 assert_eq!(subscribes[0].path, "/camera/rgb");
671 }
672
673 #[test]
674 fn test_parse_flat_format() {
675 let service = create_test_service();
676 let config: serde_json::Value = serde_json::json!({
677 "topics": {
678 "command_in": "/ai/command",
679 "response_out": "/ai/response",
680 "camera_in": "/camera/rgb",
681 "nav_goal_out": "/nav/goal",
682 "motor_cmd_out": "/motor/cmd_vel",
683 "behavior_out": "/behavior/execute"
684 }
685 });
686
687 let (publishes, subscribes) = service.parse_topics_from_json(&config);
688
689 assert_eq!(publishes.len(), 4);
691 assert!(publishes.iter().any(|t| t.path == "/ai/response"));
692 assert!(publishes.iter().any(|t| t.path == "/nav/goal"));
693 assert!(publishes.iter().any(|t| t.path == "/motor/cmd_vel"));
694 assert!(publishes.iter().any(|t| t.path == "/behavior/execute"));
695
696 assert_eq!(subscribes.len(), 2);
698 assert!(subscribes.iter().any(|t| t.path == "/ai/command"));
699 assert!(subscribes.iter().any(|t| t.path == "/camera/rgb"));
700 }
701
702 #[test]
703 fn test_parse_root_level_format() {
704 let service = create_test_service();
705 let config: serde_json::Value = serde_json::json!({
706 "input_topic": "/robot/sensors/camera/rgb",
707 "output_topic": "/inference/detections",
708 "control_topic": "/inference/cmd"
709 });
710
711 let (publishes, subscribes) = service.parse_topics_from_json(&config);
712
713 assert_eq!(publishes.len(), 1);
714 assert_eq!(subscribes.len(), 2);
715 assert_eq!(publishes[0].path, "/inference/detections");
716 assert!(subscribes.iter().any(|t| t.path == "/robot/sensors/camera/rgb"));
717 assert!(subscribes.iter().any(|t| t.path == "/inference/cmd"));
718 }
719}