mecha10_cli/services/
topology.rs

1//! Topology service for analyzing project structure
2//!
3//! This service provides static analysis of project topology including:
4//! - Nodes and their enabled status
5//! - Pub/sub topics (publishers and subscribers)
6//! - Service ports (Redis, HTTP, database, etc.)
7
8use anyhow::{Context, Result};
9use regex::Regex;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::path::{Path, PathBuf};
13
14use crate::services::ConfigService;
15use crate::types::ProjectConfig;
16
17/// Topology analysis result
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct Topology {
20    pub project_name: String,
21    pub redis: RedisInfo,
22    pub services: Vec<ServiceInfo>,
23    pub nodes: Vec<NodeTopology>,
24    pub topics: Vec<TopicTopology>,
25}
26
27/// Redis connection information
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct RedisInfo {
30    pub url: String,
31    pub host: String,
32    pub port: u16,
33}
34
35/// Service port information
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct ServiceInfo {
38    pub name: String,
39    pub host: String,
40    pub port: u16,
41}
42
43/// Node topology information
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct NodeTopology {
46    pub name: String,
47    pub package: String,
48    pub enabled: bool,
49    pub description: Option<String>,
50    pub publishes: Vec<TopicRef>,
51    pub subscribes: Vec<TopicRef>,
52}
53
54/// Topic reference with message type
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
56pub struct TopicRef {
57    pub path: String,
58    pub message_type: Option<String>,
59}
60
61/// Topic topology information (grouped by topic)
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TopicTopology {
64    pub path: String,
65    pub message_type: Option<String>,
66    pub publishers: Vec<String>,
67    pub subscribers: Vec<String>,
68}
69
70/// Topology service for static analysis
71pub struct TopologyService {
72    project_root: PathBuf,
73}
74
75impl TopologyService {
76    /// Create a new topology service
77    pub fn new(project_root: PathBuf) -> Self {
78        Self { project_root }
79    }
80
81    /// Analyze project topology
82    pub async fn analyze(&self) -> Result<Topology> {
83        // Load project configuration
84        let config_path = self.project_root.join("mecha10.json");
85        let config = ConfigService::load_from(&config_path).await?;
86
87        // Parse Redis URL
88        let redis = self.parse_redis_url(&config.redis.url)?;
89
90        // Extract service ports
91        let services = self.extract_services(&config);
92
93        // Analyze all enabled nodes
94        let nodes = self.analyze_nodes(&config).await?;
95
96        // Build topic-centric view
97        let topics = self.build_topic_view(&nodes);
98
99        Ok(Topology {
100            project_name: config.name,
101            redis,
102            services,
103            nodes,
104            topics,
105        })
106    }
107
108    /// Parse Redis URL into components
109    ///
110    /// Note: This method is public primarily for testing purposes.
111    pub fn parse_redis_url(&self, url: &str) -> Result<RedisInfo> {
112        // Handle redis://host:port format
113        let url_pattern = Regex::new(r"redis://([^:]+):(\d+)")?;
114
115        if let Some(caps) = url_pattern.captures(url) {
116            let host = caps.get(1).map(|m| m.as_str()).unwrap_or("localhost");
117            let port = caps.get(2).and_then(|m| m.as_str().parse::<u16>().ok()).unwrap_or(6379);
118
119            Ok(RedisInfo {
120                url: url.to_string(),
121                host: host.to_string(),
122                port,
123            })
124        } else {
125            // Default fallback
126            Ok(RedisInfo {
127                url: url.to_string(),
128                host: "localhost".to_string(),
129                port: 6379,
130            })
131        }
132    }
133
134    /// Extract service port information from config
135    fn extract_services(&self, config: &ProjectConfig) -> Vec<ServiceInfo> {
136        let mut services = Vec::new();
137
138        // HTTP API service
139        if let Some(http_api) = &config.services.http_api {
140            services.push(ServiceInfo {
141                name: "HTTP API".to_string(),
142                host: http_api.host.clone(),
143                port: http_api.port,
144            });
145        }
146
147        // Database service
148        if let Some(db) = &config.services.database {
149            // Try to parse postgres://host:port or similar
150            if let Some(port) = self.extract_port_from_url(&db.url) {
151                services.push(ServiceInfo {
152                    name: "Database".to_string(),
153                    host: self
154                        .extract_host_from_url(&db.url)
155                        .unwrap_or_else(|| "localhost".to_string()),
156                    port,
157                });
158            }
159        }
160
161        services
162    }
163
164    /// Extract host from a URL string
165    fn extract_host_from_url(&self, url: &str) -> Option<String> {
166        let re = Regex::new(r"://([^:/@]+)").ok()?;
167        re.captures(url)
168            .and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
169    }
170
171    /// Extract port from a URL string
172    fn extract_port_from_url(&self, url: &str) -> Option<u16> {
173        let re = Regex::new(r":(\d+)").ok()?;
174        re.captures(url)
175            .and_then(|caps| caps.get(1))
176            .and_then(|m| m.as_str().parse().ok())
177    }
178
179    /// Analyze all enabled nodes
180    async fn analyze_nodes(&self, config: &ProjectConfig) -> Result<Vec<NodeTopology>> {
181        let mut nodes = Vec::new();
182
183        // Analyze driver nodes
184        for node in &config.nodes.drivers {
185            if node.enabled {
186                let topology = self
187                    .analyze_node(&node.name, &node.path, node.description.as_deref())
188                    .await?;
189                nodes.push(topology);
190            }
191        }
192
193        // Analyze custom nodes
194        for node in &config.nodes.custom {
195            if node.enabled {
196                let topology = self
197                    .analyze_node(&node.name, &node.path, node.description.as_deref())
198                    .await?;
199                nodes.push(topology);
200            }
201        }
202
203        Ok(nodes)
204    }
205
206    /// Analyze a single node's configuration for topics
207    async fn analyze_node(&self, name: &str, package_path: &str, description: Option<&str>) -> Result<NodeTopology> {
208        // Try to load node config first
209        let (publishes, subscribes) = self.load_topics_from_config(name).await.unwrap_or_else(|_| {
210            // Fallback: try source parsing if config loading fails
211            let source_path = match self.resolve_node_source(package_path) {
212                Ok(path) if path.exists() => path,
213                _ => return (Vec::new(), Vec::new()),
214            };
215
216            // Parse source file for topics (blocking operation in async context)
217            match tokio::task::block_in_place(|| {
218                let rt = tokio::runtime::Handle::current();
219                rt.block_on(self.parse_node_source(&source_path))
220            }) {
221                Ok((pubs, subs)) => (pubs, subs),
222                Err(_) => (Vec::new(), Vec::new()),
223            }
224        });
225
226        Ok(NodeTopology {
227            name: name.to_string(),
228            package: package_path.to_string(),
229            enabled: true,
230            description: description.map(String::from),
231            publishes,
232            subscribes,
233        })
234    }
235
236    /// Load topic information from node config file
237    ///
238    /// Supports multiple config formats:
239    ///
240    /// **Format 1: Array format (recommended)**
241    /// ```json
242    /// {
243    ///   "topics": {
244    ///     "publishes": [
245    ///       { "output": "/vision/classification" },
246    ///       { "status": "/motor/status" }
247    ///     ],
248    ///     "subscribes": [
249    ///       { "input": "/camera/rgb" }
250    ///     ]
251    ///   }
252    /// }
253    /// ```
254    ///
255    /// **Format 2: Flat format (for nodes with many semantic topics)**
256    /// ```json
257    /// {
258    ///   "topics": {
259    ///     "command_in": "/ai/command",
260    ///     "response_out": "/ai/response",
261    ///     "camera_in": "/camera/rgb"
262    ///   }
263    /// }
264    /// ```
265    /// Field names ending in `_in` or `_sub` are subscribes, `_out` or `_pub` are publishes.
266    ///
267    /// **Format 3: Root-level fields (legacy)**
268    /// ```json
269    /// {
270    ///   "input_topic": "/camera/rgb",
271    ///   "output_topic": "/inference/detections",
272    ///   "control_topic": "/inference/cmd"
273    /// }
274    /// ```
275    async fn load_topics_from_config(&self, node_name: &str) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
276        // Try multiple config locations:
277        // 1. User project: configs/common/nodes/{node_name}/config.json
278        // 2. Framework monorepo: packages/nodes/{node_name}/configs/common/config.json
279        let user_config_path = self
280            .project_root
281            .join("configs/common/nodes")
282            .join(node_name)
283            .join("config.json");
284
285        let framework_config_path = self
286            .project_root
287            .join("packages/nodes")
288            .join(node_name)
289            .join("configs/common/config.json");
290
291        let config_path = if user_config_path.exists() {
292            user_config_path
293        } else if framework_config_path.exists() {
294            framework_config_path
295        } else {
296            anyhow::bail!(
297                "Config file not found at {} or {}",
298                user_config_path.display(),
299                framework_config_path.display()
300            );
301        };
302
303        // Load config as JSON
304        let content = tokio::fs::read_to_string(&config_path).await?;
305        let config: serde_json::Value = serde_json::from_str(&content)?;
306
307        // Use the helper method to parse topics from JSON
308        Ok(self.parse_topics_from_json(&config))
309    }
310
311    /// Resolve node source file path
312    fn resolve_node_source(&self, package_path: &str) -> Result<PathBuf> {
313        // Handle different path formats:
314        // 1. Relative path: "./nodes/camera" or "./drivers/motor"
315        // 2. Package name: "mecha10-nodes-camera"
316        // 3. Framework package: "@mecha10/camera-node"
317
318        if package_path.starts_with("./") || package_path.starts_with("../") {
319            // Relative path
320            let path = self.project_root.join(package_path).join("src/lib.rs");
321            Ok(path)
322        } else if package_path.starts_with("@mecha10/") || package_path.starts_with("mecha10-") {
323            // Framework package - look in workspace packages
324            let package_name = package_path
325                .strip_prefix("@mecha10/")
326                .or_else(|| package_path.strip_prefix("mecha10-"))
327                .unwrap_or(package_path);
328
329            // Strip "nodes-" or "drivers-" prefix if present (e.g., "mecha10-nodes-speaker" -> "speaker")
330            let package_name = package_name
331                .strip_prefix("nodes-")
332                .or_else(|| package_name.strip_prefix("drivers-"))
333                .unwrap_or(package_name);
334
335            // Try to find framework root (MECHA10_FRAMEWORK_PATH or walk up to find workspace)
336            let framework_root = self.find_framework_root()?;
337
338            // Try packages/nodes/{name}/src/lib.rs
339            let nodes_path = framework_root
340                .join("packages/nodes")
341                .join(package_name)
342                .join("src/lib.rs");
343            if nodes_path.exists() {
344                return Ok(nodes_path);
345            }
346
347            // Try packages/drivers/{name}/src/lib.rs
348            let drivers_path = framework_root
349                .join("packages/drivers")
350                .join(package_name)
351                .join("src/lib.rs");
352            if drivers_path.exists() {
353                return Ok(drivers_path);
354            }
355
356            // Try packages/services/{name}/src/lib.rs
357            let services_path = framework_root
358                .join("packages/services")
359                .join(package_name)
360                .join("src/lib.rs");
361            if services_path.exists() {
362                return Ok(services_path);
363            }
364
365            // Not found, return the first attempt
366            Ok(nodes_path)
367        } else {
368            // Assume it's a package name
369            Ok(self.project_root.join("nodes").join(package_path).join("src/lib.rs"))
370        }
371    }
372
373    /// Find the mecha10 framework root directory
374    fn find_framework_root(&self) -> Result<PathBuf> {
375        // First try MECHA10_FRAMEWORK_PATH environment variable
376        if let Ok(framework_path) = std::env::var("MECHA10_FRAMEWORK_PATH") {
377            let path = PathBuf::from(framework_path);
378            if path.exists() {
379                return Ok(path);
380            }
381        }
382
383        // Check if we're already in the framework root (has packages/nodes directory)
384        if self.project_root.join("packages/nodes").exists() {
385            return Ok(self.project_root.clone());
386        }
387
388        // Walk up from project root to find workspace with packages/nodes
389        let mut current = self.project_root.clone();
390        loop {
391            if current.join("packages/nodes").exists() {
392                return Ok(current);
393            }
394
395            match current.parent() {
396                Some(parent) => current = parent.to_path_buf(),
397                None => break,
398            }
399        }
400
401        // Fallback to project root
402        Ok(self.project_root.clone())
403    }
404
405    /// Parse node source file for topic definitions and usage
406    async fn parse_node_source(&self, source_path: &Path) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
407        let content = tokio::fs::read_to_string(source_path)
408            .await
409            .context(format!("Failed to read source file: {}", source_path.display()))?;
410
411        // Extract topic constants and their types
412        let topic_defs = self.extract_topic_definitions(&content);
413
414        // Find publish calls
415        let publishes = self.extract_publish_calls(&content, &topic_defs);
416
417        // Find subscribe calls
418        let subscribes = self.extract_subscribe_calls(&content, &topic_defs);
419
420        Ok((publishes, subscribes))
421    }
422
423    /// Extract topic constant definitions from source
424    ///
425    /// Note: This method is public primarily for testing purposes.
426    pub fn extract_topic_definitions(&self, content: &str) -> HashMap<String, TopicRef> {
427        let mut topics = HashMap::new();
428
429        // Match: pub const TOPIC_NAME: Topic<MessageType> = Topic::new("/path");
430        let topic_pattern =
431            Regex::new(r#"pub\s+const\s+([A-Z_]+):\s*Topic<([^>]+)>\s*=\s*Topic::new\("([^"]+)"\)"#).unwrap();
432
433        for caps in topic_pattern.captures_iter(content) {
434            let const_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
435            let message_type = caps.get(2).map(|m| m.as_str().trim()).unwrap_or("");
436            let topic_path = caps.get(3).map(|m| m.as_str()).unwrap_or("");
437
438            topics.insert(
439                const_name.to_string(),
440                TopicRef {
441                    path: topic_path.to_string(),
442                    message_type: Some(message_type.to_string()),
443                },
444            );
445        }
446
447        topics
448    }
449
450    /// Extract publish_to calls
451    ///
452    /// Note: This method is public primarily for testing purposes.
453    pub fn extract_publish_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
454        let mut publishes = Vec::new();
455        let mut seen = HashSet::new();
456
457        // Match: ctx.publish_to(TOPIC_NAME, ...)
458        // or: publish_to(TOPIC_NAME, ...)
459        // or: ctx.publish_to(topics::TOPIC_NAME, ...)
460        // Use a more flexible pattern that handles whitespace and optional module prefix
461        let publish_pattern = Regex::new(r"publish_to\s*\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*,").unwrap();
462
463        for caps in publish_pattern.captures_iter(content) {
464            if let Some(const_name) = caps.get(1) {
465                let const_name = const_name.as_str();
466                if let Some(topic) = topic_defs.get(const_name) {
467                    if seen.insert(topic.path.clone()) {
468                        publishes.push(topic.clone());
469                    }
470                }
471            }
472        }
473
474        publishes
475    }
476
477    /// Extract subscribe calls
478    ///
479    /// Note: This method is public primarily for testing purposes.
480    pub fn extract_subscribe_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
481        let mut subscribes = Vec::new();
482        let mut seen = HashSet::new();
483
484        // Match: ctx.subscribe::<MessageType>(TOPIC_NAME)
485        // or: subscribe(TOPIC_NAME)
486        // or: ctx.subscribe::<MessageType>(topics::TOPIC_NAME)
487        // Use a more flexible pattern that handles whitespace, generics, and optional module prefix
488        let subscribe_pattern =
489            Regex::new(r"subscribe\s*(?:::\s*<[^>]+>\s*)?\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*\)").unwrap();
490
491        for caps in subscribe_pattern.captures_iter(content) {
492            if let Some(const_name) = caps.get(1) {
493                let const_name = const_name.as_str();
494                if let Some(topic) = topic_defs.get(const_name) {
495                    if seen.insert(topic.path.clone()) {
496                        subscribes.push(topic.clone());
497                    }
498                }
499            }
500        }
501
502        subscribes
503    }
504
505    /// Build topic-centric view from node-centric data
506    fn build_topic_view(&self, nodes: &[NodeTopology]) -> Vec<TopicTopology> {
507        let mut topic_map: HashMap<String, TopicTopology> = HashMap::new();
508
509        for node in nodes {
510            // Add publishers
511            for topic_ref in &node.publishes {
512                let topic = topic_map
513                    .entry(topic_ref.path.clone())
514                    .or_insert_with(|| TopicTopology {
515                        path: topic_ref.path.clone(),
516                        message_type: topic_ref.message_type.clone(),
517                        publishers: Vec::new(),
518                        subscribers: Vec::new(),
519                    });
520                if !topic.publishers.contains(&node.name) {
521                    topic.publishers.push(node.name.clone());
522                }
523            }
524
525            // Add subscribers
526            for topic_ref in &node.subscribes {
527                let topic = topic_map
528                    .entry(topic_ref.path.clone())
529                    .or_insert_with(|| TopicTopology {
530                        path: topic_ref.path.clone(),
531                        message_type: topic_ref.message_type.clone(),
532                        publishers: Vec::new(),
533                        subscribers: Vec::new(),
534                    });
535                if !topic.subscribers.contains(&node.name) {
536                    topic.subscribers.push(node.name.clone());
537                }
538            }
539        }
540
541        let mut topics: Vec<TopicTopology> = topic_map.into_values().collect();
542        topics.sort_by(|a, b| a.path.cmp(&b.path));
543        topics
544    }
545
546    /// Parse topics from JSON config value
547    ///
548    /// This is a helper method extracted for testing. It contains the core parsing logic
549    /// from `load_topics_from_config` but operates on a JSON value instead of a file path.
550    fn parse_topics_from_json(&self, config: &serde_json::Value) -> (Vec<TopicRef>, Vec<TopicRef>) {
551        let mut publishes = Vec::new();
552        let mut subscribes = Vec::new();
553
554        // Try Format 1: Array format (topics.publishes/topics.subscribes)
555        if let Some(topics) = config.get("topics").and_then(|v| v.as_object()) {
556            // Check if using array format
557            if topics.contains_key("publishes") || topics.contains_key("subscribes") {
558                // Format 1: Array format
559                if let Some(pubs) = topics.get("publishes").and_then(|v| v.as_array()) {
560                    for topic in pubs {
561                        if let Some(topic_obj) = topic.as_object() {
562                            for (_semantic_name, topic_path) in topic_obj {
563                                if let Some(path_str) = topic_path.as_str() {
564                                    publishes.push(TopicRef {
565                                        path: path_str.to_string(),
566                                        message_type: None,
567                                    });
568                                }
569                            }
570                        }
571                    }
572                }
573
574                if let Some(subs) = topics.get("subscribes").and_then(|v| v.as_array()) {
575                    for topic in subs {
576                        if let Some(topic_obj) = topic.as_object() {
577                            for (_semantic_name, topic_path) in topic_obj {
578                                if let Some(path_str) = topic_path.as_str() {
579                                    subscribes.push(TopicRef {
580                                        path: path_str.to_string(),
581                                        message_type: None,
582                                    });
583                                }
584                            }
585                        }
586                    }
587                }
588            } else {
589                // Format 2: Flat format - infer pub/sub from field names
590                for (field_name, topic_path) in topics {
591                    if let Some(path_str) = topic_path.as_str() {
592                        // Classify based on field name suffix
593                        if field_name.ends_with("_in") || field_name.ends_with("_sub") || field_name.contains("input") {
594                            subscribes.push(TopicRef {
595                                path: path_str.to_string(),
596                                message_type: None,
597                            });
598                        } else if field_name.ends_with("_out")
599                            || field_name.ends_with("_pub")
600                            || field_name.contains("output")
601                        {
602                            publishes.push(TopicRef {
603                                path: path_str.to_string(),
604                                message_type: None,
605                            });
606                        }
607                        // If field name doesn't match patterns, skip it (might be control/status)
608                    }
609                }
610            }
611        }
612
613        // Try Format 3: Root-level topic fields (fallback)
614        if publishes.is_empty() && subscribes.is_empty() {
615            // Check for input_topic, output_topic, control_topic pattern
616            if let Some(input) = config.get("input_topic").and_then(|v| v.as_str()) {
617                subscribes.push(TopicRef {
618                    path: input.to_string(),
619                    message_type: None,
620                });
621            }
622            if let Some(output) = config.get("output_topic").and_then(|v| v.as_str()) {
623                publishes.push(TopicRef {
624                    path: output.to_string(),
625                    message_type: None,
626                });
627            }
628            if let Some(control) = config.get("control_topic").and_then(|v| v.as_str()) {
629                subscribes.push(TopicRef {
630                    path: control.to_string(),
631                    message_type: None,
632                });
633            }
634        }
635
636        (publishes, subscribes)
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643    use std::path::PathBuf;
644
645    fn create_test_service() -> TopologyService {
646        TopologyService::new(PathBuf::from("/tmp/test"))
647    }
648
649    #[test]
650    fn test_parse_array_format() {
651        let service = create_test_service();
652        let config: serde_json::Value = serde_json::json!({
653            "topics": {
654                "publishes": [
655                    { "output": "/vision/classification" },
656                    { "status": "/motor/status" }
657                ],
658                "subscribes": [
659                    { "input": "/camera/rgb" }
660                ]
661            }
662        });
663
664        let (publishes, subscribes) = service.parse_topics_from_json(&config);
665
666        assert_eq!(publishes.len(), 2);
667        assert_eq!(subscribes.len(), 1);
668        assert_eq!(publishes[0].path, "/vision/classification");
669        assert_eq!(publishes[1].path, "/motor/status");
670        assert_eq!(subscribes[0].path, "/camera/rgb");
671    }
672
673    #[test]
674    fn test_parse_flat_format() {
675        let service = create_test_service();
676        let config: serde_json::Value = serde_json::json!({
677            "topics": {
678                "command_in": "/ai/command",
679                "response_out": "/ai/response",
680                "camera_in": "/camera/rgb",
681                "nav_goal_out": "/nav/goal",
682                "motor_cmd_out": "/motor/cmd_vel",
683                "behavior_out": "/behavior/execute"
684            }
685        });
686
687        let (publishes, subscribes) = service.parse_topics_from_json(&config);
688
689        // Should have 4 publishes (_out suffix)
690        assert_eq!(publishes.len(), 4);
691        assert!(publishes.iter().any(|t| t.path == "/ai/response"));
692        assert!(publishes.iter().any(|t| t.path == "/nav/goal"));
693        assert!(publishes.iter().any(|t| t.path == "/motor/cmd_vel"));
694        assert!(publishes.iter().any(|t| t.path == "/behavior/execute"));
695
696        // Should have 2 subscribes (_in suffix)
697        assert_eq!(subscribes.len(), 2);
698        assert!(subscribes.iter().any(|t| t.path == "/ai/command"));
699        assert!(subscribes.iter().any(|t| t.path == "/camera/rgb"));
700    }
701
702    #[test]
703    fn test_parse_root_level_format() {
704        let service = create_test_service();
705        let config: serde_json::Value = serde_json::json!({
706            "input_topic": "/robot/sensors/camera/rgb",
707            "output_topic": "/inference/detections",
708            "control_topic": "/inference/cmd"
709        });
710
711        let (publishes, subscribes) = service.parse_topics_from_json(&config);
712
713        assert_eq!(publishes.len(), 1);
714        assert_eq!(subscribes.len(), 2);
715        assert_eq!(publishes[0].path, "/inference/detections");
716        assert!(subscribes.iter().any(|t| t.path == "/robot/sensors/camera/rgb"));
717        assert!(subscribes.iter().any(|t| t.path == "/inference/cmd"));
718    }
719}