mecha10_cli/services/
topology.rs

1//! Topology service for analyzing project structure
2//!
3//! This service provides static analysis of project topology including:
4//! - Nodes and their enabled status
5//! - Pub/sub topics (publishers and subscribers)
6//! - Service ports (Redis, HTTP, database, etc.)
7
8use anyhow::{Context, Result};
9use regex::Regex;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::path::{Path, PathBuf};
13
14use crate::services::ConfigService;
15use crate::types::ProjectConfig;
16
17/// Topology analysis result
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct Topology {
20    pub project_name: String,
21    pub redis: RedisInfo,
22    pub services: Vec<ServiceInfo>,
23    pub nodes: Vec<NodeTopology>,
24    pub topics: Vec<TopicTopology>,
25}
26
27/// Redis connection information
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct RedisInfo {
30    pub url: String,
31    pub host: String,
32    pub port: u16,
33}
34
35/// Service port information
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct ServiceInfo {
38    pub name: String,
39    pub host: String,
40    pub port: u16,
41}
42
43/// Node topology information
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct NodeTopology {
46    pub name: String,
47    pub package: String,
48    pub enabled: bool,
49    pub description: Option<String>,
50    pub publishes: Vec<TopicRef>,
51    pub subscribes: Vec<TopicRef>,
52}
53
54/// Topic reference with message type
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
56pub struct TopicRef {
57    pub path: String,
58    pub message_type: Option<String>,
59}
60
61/// Topic topology information (grouped by topic)
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TopicTopology {
64    pub path: String,
65    pub message_type: Option<String>,
66    pub publishers: Vec<String>,
67    pub subscribers: Vec<String>,
68}
69
70/// Topology service for static analysis
71pub struct TopologyService {
72    project_root: PathBuf,
73}
74
75impl TopologyService {
76    /// Create a new topology service
77    pub fn new(project_root: PathBuf) -> Self {
78        Self { project_root }
79    }
80
81    /// Analyze project topology
82    pub async fn analyze(&self) -> Result<Topology> {
83        // Load project configuration
84        let config_path = self.project_root.join("mecha10.json");
85        let config = ConfigService::load_from(&config_path).await?;
86
87        // Parse Redis URL
88        let redis = self.parse_redis_url(&config.redis.url)?;
89
90        // Extract service ports
91        let services = self.extract_services(&config);
92
93        // Analyze all enabled nodes
94        let nodes = self.analyze_nodes(&config).await?;
95
96        // Build topic-centric view
97        let topics = self.build_topic_view(&nodes);
98
99        Ok(Topology {
100            project_name: config.name,
101            redis,
102            services,
103            nodes,
104            topics,
105        })
106    }
107
108    /// Parse Redis URL into components
109    ///
110    /// Note: This method is public primarily for testing purposes.
111    pub fn parse_redis_url(&self, url: &str) -> Result<RedisInfo> {
112        // Handle redis://host:port format
113        let url_pattern = Regex::new(r"redis://([^:]+):(\d+)")?;
114
115        if let Some(caps) = url_pattern.captures(url) {
116            let host = caps.get(1).map(|m| m.as_str()).unwrap_or("localhost");
117            let port = caps.get(2).and_then(|m| m.as_str().parse::<u16>().ok()).unwrap_or(6379);
118
119            Ok(RedisInfo {
120                url: url.to_string(),
121                host: host.to_string(),
122                port,
123            })
124        } else {
125            // Default fallback
126            Ok(RedisInfo {
127                url: url.to_string(),
128                host: "localhost".to_string(),
129                port: 6379,
130            })
131        }
132    }
133
134    /// Extract service port information from config
135    fn extract_services(&self, config: &ProjectConfig) -> Vec<ServiceInfo> {
136        let mut services = Vec::new();
137
138        // HTTP API service
139        if let Some(http_api) = &config.services.http_api {
140            services.push(ServiceInfo {
141                name: "HTTP API".to_string(),
142                host: http_api.host.clone(),
143                port: http_api.port,
144            });
145        }
146
147        // Database service
148        if let Some(db) = &config.services.database {
149            // Try to parse postgres://host:port or similar
150            if let Some(port) = self.extract_port_from_url(&db.url) {
151                services.push(ServiceInfo {
152                    name: "Database".to_string(),
153                    host: self
154                        .extract_host_from_url(&db.url)
155                        .unwrap_or_else(|| "localhost".to_string()),
156                    port,
157                });
158            }
159        }
160
161        services
162    }
163
164    /// Extract host from a URL string
165    fn extract_host_from_url(&self, url: &str) -> Option<String> {
166        let re = Regex::new(r"://([^:/@]+)").ok()?;
167        re.captures(url)
168            .and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
169    }
170
171    /// Extract port from a URL string
172    fn extract_port_from_url(&self, url: &str) -> Option<u16> {
173        let re = Regex::new(r":(\d+)").ok()?;
174        re.captures(url)
175            .and_then(|caps| caps.get(1))
176            .and_then(|m| m.as_str().parse().ok())
177    }
178
179    /// Analyze all nodes
180    async fn analyze_nodes(&self, config: &ProjectConfig) -> Result<Vec<NodeTopology>> {
181        let mut nodes = Vec::new();
182
183        // Analyze all nodes from config
184        for spec in config.nodes.get_node_specs() {
185            let topology = self.analyze_node(&spec.name, &spec.package_path(), None).await?;
186            nodes.push(topology);
187        }
188
189        Ok(nodes)
190    }
191
192    /// Analyze a single node's configuration for topics
193    async fn analyze_node(&self, name: &str, package_path: &str, description: Option<&str>) -> Result<NodeTopology> {
194        // Try to load node config first
195        let (publishes, subscribes) = self.load_topics_from_config(name).await.unwrap_or_else(|_| {
196            // Fallback: try source parsing if config loading fails
197            let source_path = match self.resolve_node_source(package_path) {
198                Ok(path) if path.exists() => path,
199                _ => return (Vec::new(), Vec::new()),
200            };
201
202            // Parse source file for topics (blocking operation in async context)
203            match tokio::task::block_in_place(|| {
204                let rt = tokio::runtime::Handle::current();
205                rt.block_on(self.parse_node_source(&source_path))
206            }) {
207                Ok((pubs, subs)) => (pubs, subs),
208                Err(_) => (Vec::new(), Vec::new()),
209            }
210        });
211
212        Ok(NodeTopology {
213            name: name.to_string(),
214            package: package_path.to_string(),
215            enabled: true,
216            description: description.map(String::from),
217            publishes,
218            subscribes,
219        })
220    }
221
222    /// Load topic information from node config file
223    ///
224    /// Supports multiple config formats:
225    ///
226    /// **Format 1: Array format (recommended)**
227    /// ```json
228    /// {
229    ///   "topics": {
230    ///     "publishes": [
231    ///       { "output": "/vision/classification" },
232    ///       { "status": "/motor/status" }
233    ///     ],
234    ///     "subscribes": [
235    ///       { "input": "/camera/rgb" }
236    ///     ]
237    ///   }
238    /// }
239    /// ```
240    ///
241    /// **Format 2: Flat format (for nodes with many semantic topics)**
242    /// ```json
243    /// {
244    ///   "topics": {
245    ///     "command_in": "/ai/command",
246    ///     "response_out": "/ai/response",
247    ///     "camera_in": "/camera/rgb"
248    ///   }
249    /// }
250    /// ```
251    /// Field names ending in `_in` or `_sub` are subscribes, `_out` or `_pub` are publishes.
252    ///
253    /// **Format 3: Root-level fields (legacy)**
254    /// ```json
255    /// {
256    ///   "input_topic": "/camera/rgb",
257    ///   "output_topic": "/inference/detections",
258    ///   "control_topic": "/inference/cmd"
259    /// }
260    /// ```
261    async fn load_topics_from_config(&self, node_name: &str) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
262        // Try multiple config locations:
263        // 1. User project: configs/common/nodes/{node_name}/config.json
264        // 2. Framework monorepo: packages/nodes/{node_name}/configs/common/config.json
265        let user_config_path = self
266            .project_root
267            .join("configs/common/nodes")
268            .join(node_name)
269            .join("config.json");
270
271        let framework_config_path = self
272            .project_root
273            .join("packages/nodes")
274            .join(node_name)
275            .join("configs/common/config.json");
276
277        let config_path = if user_config_path.exists() {
278            user_config_path
279        } else if framework_config_path.exists() {
280            framework_config_path
281        } else {
282            anyhow::bail!(
283                "Config file not found at {} or {}",
284                user_config_path.display(),
285                framework_config_path.display()
286            );
287        };
288
289        // Load config as JSON
290        let content = tokio::fs::read_to_string(&config_path).await?;
291        let config: serde_json::Value = serde_json::from_str(&content)?;
292
293        // Use the helper method to parse topics from JSON
294        Ok(self.parse_topics_from_json(&config))
295    }
296
297    /// Resolve node source file path
298    fn resolve_node_source(&self, package_path: &str) -> Result<PathBuf> {
299        // Handle different path formats:
300        // 1. Relative path: "./nodes/camera" or "./drivers/motor"
301        // 2. Package name: "mecha10-nodes-camera"
302        // 3. Framework package: "@mecha10/camera-node"
303
304        if package_path.starts_with("./") || package_path.starts_with("../") {
305            // Relative path
306            let path = self.project_root.join(package_path).join("src/lib.rs");
307            Ok(path)
308        } else if package_path.starts_with("@mecha10/") || package_path.starts_with("mecha10-") {
309            // Framework package - look in workspace packages
310            let package_name = package_path
311                .strip_prefix("@mecha10/")
312                .or_else(|| package_path.strip_prefix("mecha10-"))
313                .unwrap_or(package_path);
314
315            // Strip "nodes-" or "drivers-" prefix if present (e.g., "mecha10-nodes-speaker" -> "speaker")
316            let package_name = package_name
317                .strip_prefix("nodes-")
318                .or_else(|| package_name.strip_prefix("drivers-"))
319                .unwrap_or(package_name);
320
321            // Try to find framework root (MECHA10_FRAMEWORK_PATH or walk up to find workspace)
322            let framework_root = self.find_framework_root()?;
323
324            // Try packages/nodes/{name}/src/lib.rs
325            let nodes_path = framework_root
326                .join("packages/nodes")
327                .join(package_name)
328                .join("src/lib.rs");
329            if nodes_path.exists() {
330                return Ok(nodes_path);
331            }
332
333            // Try packages/drivers/{name}/src/lib.rs
334            let drivers_path = framework_root
335                .join("packages/drivers")
336                .join(package_name)
337                .join("src/lib.rs");
338            if drivers_path.exists() {
339                return Ok(drivers_path);
340            }
341
342            // Try packages/services/{name}/src/lib.rs
343            let services_path = framework_root
344                .join("packages/services")
345                .join(package_name)
346                .join("src/lib.rs");
347            if services_path.exists() {
348                return Ok(services_path);
349            }
350
351            // Not found, return the first attempt
352            Ok(nodes_path)
353        } else {
354            // Assume it's a package name
355            Ok(self.project_root.join("nodes").join(package_path).join("src/lib.rs"))
356        }
357    }
358
359    /// Find the mecha10 framework root directory
360    fn find_framework_root(&self) -> Result<PathBuf> {
361        // First try MECHA10_FRAMEWORK_PATH environment variable
362        if let Ok(framework_path) = std::env::var("MECHA10_FRAMEWORK_PATH") {
363            let path = PathBuf::from(framework_path);
364            if path.exists() {
365                return Ok(path);
366            }
367        }
368
369        // Check if we're already in the framework root (has packages/nodes directory)
370        if self.project_root.join("packages/nodes").exists() {
371            return Ok(self.project_root.clone());
372        }
373
374        // Walk up from project root to find workspace with packages/nodes
375        let mut current = self.project_root.clone();
376        loop {
377            if current.join("packages/nodes").exists() {
378                return Ok(current);
379            }
380
381            match current.parent() {
382                Some(parent) => current = parent.to_path_buf(),
383                None => break,
384            }
385        }
386
387        // Fallback to project root
388        Ok(self.project_root.clone())
389    }
390
391    /// Parse node source file for topic definitions and usage
392    async fn parse_node_source(&self, source_path: &Path) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
393        let content = tokio::fs::read_to_string(source_path)
394            .await
395            .context(format!("Failed to read source file: {}", source_path.display()))?;
396
397        // Extract topic constants and their types
398        let topic_defs = self.extract_topic_definitions(&content);
399
400        // Find publish calls
401        let publishes = self.extract_publish_calls(&content, &topic_defs);
402
403        // Find subscribe calls
404        let subscribes = self.extract_subscribe_calls(&content, &topic_defs);
405
406        Ok((publishes, subscribes))
407    }
408
409    /// Extract topic constant definitions from source
410    ///
411    /// Note: This method is public primarily for testing purposes.
412    pub fn extract_topic_definitions(&self, content: &str) -> HashMap<String, TopicRef> {
413        let mut topics = HashMap::new();
414
415        // Match: pub const TOPIC_NAME: Topic<MessageType> = Topic::new("/path");
416        let topic_pattern =
417            Regex::new(r#"pub\s+const\s+([A-Z_]+):\s*Topic<([^>]+)>\s*=\s*Topic::new\("([^"]+)"\)"#).unwrap();
418
419        for caps in topic_pattern.captures_iter(content) {
420            let const_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
421            let message_type = caps.get(2).map(|m| m.as_str().trim()).unwrap_or("");
422            let topic_path = caps.get(3).map(|m| m.as_str()).unwrap_or("");
423
424            topics.insert(
425                const_name.to_string(),
426                TopicRef {
427                    path: topic_path.to_string(),
428                    message_type: Some(message_type.to_string()),
429                },
430            );
431        }
432
433        topics
434    }
435
436    /// Extract publish_to calls
437    ///
438    /// Note: This method is public primarily for testing purposes.
439    pub fn extract_publish_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
440        let mut publishes = Vec::new();
441        let mut seen = HashSet::new();
442
443        // Match: ctx.publish_to(TOPIC_NAME, ...)
444        // or: publish_to(TOPIC_NAME, ...)
445        // or: ctx.publish_to(topics::TOPIC_NAME, ...)
446        // Use a more flexible pattern that handles whitespace and optional module prefix
447        let publish_pattern = Regex::new(r"publish_to\s*\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*,").unwrap();
448
449        for caps in publish_pattern.captures_iter(content) {
450            if let Some(const_name) = caps.get(1) {
451                let const_name = const_name.as_str();
452                if let Some(topic) = topic_defs.get(const_name) {
453                    if seen.insert(topic.path.clone()) {
454                        publishes.push(topic.clone());
455                    }
456                }
457            }
458        }
459
460        publishes
461    }
462
463    /// Extract subscribe calls
464    ///
465    /// Note: This method is public primarily for testing purposes.
466    pub fn extract_subscribe_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
467        let mut subscribes = Vec::new();
468        let mut seen = HashSet::new();
469
470        // Match: ctx.subscribe::<MessageType>(TOPIC_NAME)
471        // or: subscribe(TOPIC_NAME)
472        // or: ctx.subscribe::<MessageType>(topics::TOPIC_NAME)
473        // Use a more flexible pattern that handles whitespace, generics, and optional module prefix
474        let subscribe_pattern =
475            Regex::new(r"subscribe\s*(?:::\s*<[^>]+>\s*)?\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*\)").unwrap();
476
477        for caps in subscribe_pattern.captures_iter(content) {
478            if let Some(const_name) = caps.get(1) {
479                let const_name = const_name.as_str();
480                if let Some(topic) = topic_defs.get(const_name) {
481                    if seen.insert(topic.path.clone()) {
482                        subscribes.push(topic.clone());
483                    }
484                }
485            }
486        }
487
488        subscribes
489    }
490
491    /// Build topic-centric view from node-centric data
492    fn build_topic_view(&self, nodes: &[NodeTopology]) -> Vec<TopicTopology> {
493        let mut topic_map: HashMap<String, TopicTopology> = HashMap::new();
494
495        for node in nodes {
496            // Add publishers
497            for topic_ref in &node.publishes {
498                let topic = topic_map
499                    .entry(topic_ref.path.clone())
500                    .or_insert_with(|| TopicTopology {
501                        path: topic_ref.path.clone(),
502                        message_type: topic_ref.message_type.clone(),
503                        publishers: Vec::new(),
504                        subscribers: Vec::new(),
505                    });
506                if !topic.publishers.contains(&node.name) {
507                    topic.publishers.push(node.name.clone());
508                }
509            }
510
511            // Add subscribers
512            for topic_ref in &node.subscribes {
513                let topic = topic_map
514                    .entry(topic_ref.path.clone())
515                    .or_insert_with(|| TopicTopology {
516                        path: topic_ref.path.clone(),
517                        message_type: topic_ref.message_type.clone(),
518                        publishers: Vec::new(),
519                        subscribers: Vec::new(),
520                    });
521                if !topic.subscribers.contains(&node.name) {
522                    topic.subscribers.push(node.name.clone());
523                }
524            }
525        }
526
527        let mut topics: Vec<TopicTopology> = topic_map.into_values().collect();
528        topics.sort_by(|a, b| a.path.cmp(&b.path));
529        topics
530    }
531
532    /// Parse topics from JSON config value
533    ///
534    /// This is a helper method extracted for testing. It contains the core parsing logic
535    /// from `load_topics_from_config` but operates on a JSON value instead of a file path.
536    fn parse_topics_from_json(&self, config: &serde_json::Value) -> (Vec<TopicRef>, Vec<TopicRef>) {
537        let mut publishes = Vec::new();
538        let mut subscribes = Vec::new();
539
540        // Try Format 1: Array format (topics.publishes/topics.subscribes)
541        if let Some(topics) = config.get("topics").and_then(|v| v.as_object()) {
542            // Check if using array format
543            if topics.contains_key("publishes") || topics.contains_key("subscribes") {
544                // Format 1: Array format
545                if let Some(pubs) = topics.get("publishes").and_then(|v| v.as_array()) {
546                    for topic in pubs {
547                        if let Some(topic_obj) = topic.as_object() {
548                            for (_semantic_name, topic_path) in topic_obj {
549                                if let Some(path_str) = topic_path.as_str() {
550                                    publishes.push(TopicRef {
551                                        path: path_str.to_string(),
552                                        message_type: None,
553                                    });
554                                }
555                            }
556                        }
557                    }
558                }
559
560                if let Some(subs) = topics.get("subscribes").and_then(|v| v.as_array()) {
561                    for topic in subs {
562                        if let Some(topic_obj) = topic.as_object() {
563                            for (_semantic_name, topic_path) in topic_obj {
564                                if let Some(path_str) = topic_path.as_str() {
565                                    subscribes.push(TopicRef {
566                                        path: path_str.to_string(),
567                                        message_type: None,
568                                    });
569                                }
570                            }
571                        }
572                    }
573                }
574            } else {
575                // Format 2: Flat format - infer pub/sub from field names
576                for (field_name, topic_path) in topics {
577                    if let Some(path_str) = topic_path.as_str() {
578                        // Classify based on field name suffix
579                        if field_name.ends_with("_in") || field_name.ends_with("_sub") || field_name.contains("input") {
580                            subscribes.push(TopicRef {
581                                path: path_str.to_string(),
582                                message_type: None,
583                            });
584                        } else if field_name.ends_with("_out")
585                            || field_name.ends_with("_pub")
586                            || field_name.contains("output")
587                        {
588                            publishes.push(TopicRef {
589                                path: path_str.to_string(),
590                                message_type: None,
591                            });
592                        }
593                        // If field name doesn't match patterns, skip it (might be control/status)
594                    }
595                }
596            }
597        }
598
599        // Try Format 3: Root-level topic fields (fallback)
600        if publishes.is_empty() && subscribes.is_empty() {
601            // Check for input_topic, output_topic, control_topic pattern
602            if let Some(input) = config.get("input_topic").and_then(|v| v.as_str()) {
603                subscribes.push(TopicRef {
604                    path: input.to_string(),
605                    message_type: None,
606                });
607            }
608            if let Some(output) = config.get("output_topic").and_then(|v| v.as_str()) {
609                publishes.push(TopicRef {
610                    path: output.to_string(),
611                    message_type: None,
612                });
613            }
614            if let Some(control) = config.get("control_topic").and_then(|v| v.as_str()) {
615                subscribes.push(TopicRef {
616                    path: control.to_string(),
617                    message_type: None,
618                });
619            }
620        }
621
622        (publishes, subscribes)
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629    use std::path::PathBuf;
630
631    fn create_test_service() -> TopologyService {
632        TopologyService::new(PathBuf::from("/tmp/test"))
633    }
634
635    #[test]
636    fn test_parse_array_format() {
637        let service = create_test_service();
638        let config: serde_json::Value = serde_json::json!({
639            "topics": {
640                "publishes": [
641                    { "output": "/vision/classification" },
642                    { "status": "/motor/status" }
643                ],
644                "subscribes": [
645                    { "input": "/camera/rgb" }
646                ]
647            }
648        });
649
650        let (publishes, subscribes) = service.parse_topics_from_json(&config);
651
652        assert_eq!(publishes.len(), 2);
653        assert_eq!(subscribes.len(), 1);
654        assert_eq!(publishes[0].path, "/vision/classification");
655        assert_eq!(publishes[1].path, "/motor/status");
656        assert_eq!(subscribes[0].path, "/camera/rgb");
657    }
658
659    #[test]
660    fn test_parse_flat_format() {
661        let service = create_test_service();
662        let config: serde_json::Value = serde_json::json!({
663            "topics": {
664                "command_in": "/ai/command",
665                "response_out": "/ai/response",
666                "camera_in": "/camera/rgb",
667                "nav_goal_out": "/nav/goal",
668                "motor_cmd_out": "/motor/cmd_vel",
669                "behavior_out": "/behavior/execute"
670            }
671        });
672
673        let (publishes, subscribes) = service.parse_topics_from_json(&config);
674
675        // Should have 4 publishes (_out suffix)
676        assert_eq!(publishes.len(), 4);
677        assert!(publishes.iter().any(|t| t.path == "/ai/response"));
678        assert!(publishes.iter().any(|t| t.path == "/nav/goal"));
679        assert!(publishes.iter().any(|t| t.path == "/motor/cmd_vel"));
680        assert!(publishes.iter().any(|t| t.path == "/behavior/execute"));
681
682        // Should have 2 subscribes (_in suffix)
683        assert_eq!(subscribes.len(), 2);
684        assert!(subscribes.iter().any(|t| t.path == "/ai/command"));
685        assert!(subscribes.iter().any(|t| t.path == "/camera/rgb"));
686    }
687
688    #[test]
689    fn test_parse_root_level_format() {
690        let service = create_test_service();
691        let config: serde_json::Value = serde_json::json!({
692            "input_topic": "/robot/sensors/camera/rgb",
693            "output_topic": "/inference/detections",
694            "control_topic": "/inference/cmd"
695        });
696
697        let (publishes, subscribes) = service.parse_topics_from_json(&config);
698
699        assert_eq!(publishes.len(), 1);
700        assert_eq!(subscribes.len(), 2);
701        assert_eq!(publishes[0].path, "/inference/detections");
702        assert!(subscribes.iter().any(|t| t.path == "/robot/sensors/camera/rgb"));
703        assert!(subscribes.iter().any(|t| t.path == "/inference/cmd"));
704    }
705}