mecha10_cli/services/
topology.rs

1//! Topology service for analyzing project structure
2//!
3//! This service provides static analysis of project topology including:
4//! - Nodes and their enabled status
5//! - Pub/sub topics (publishers and subscribers)
6//! - Service ports (Redis, HTTP, database, etc.)
7
8use anyhow::{Context, Result};
9use regex::Regex;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::path::{Path, PathBuf};
13
14use crate::paths;
15use crate::services::ConfigService;
16use crate::types::ProjectConfig;
17
18/// Topology analysis result
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct Topology {
21    pub project_name: String,
22    pub redis: RedisInfo,
23    pub services: Vec<ServiceInfo>,
24    pub nodes: Vec<NodeTopology>,
25    pub topics: Vec<TopicTopology>,
26}
27
28/// Redis connection information
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct RedisInfo {
31    pub url: String,
32    pub host: String,
33    pub port: u16,
34}
35
36/// Service port information
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ServiceInfo {
39    pub name: String,
40    pub host: String,
41    pub port: u16,
42}
43
44/// Node topology information
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeTopology {
47    pub name: String,
48    pub package: String,
49    pub enabled: bool,
50    pub description: Option<String>,
51    pub publishes: Vec<TopicRef>,
52    pub subscribes: Vec<TopicRef>,
53}
54
55/// Topic reference with message type
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
57pub struct TopicRef {
58    pub path: String,
59    pub message_type: Option<String>,
60}
61
62/// Topic topology information (grouped by topic)
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct TopicTopology {
65    pub path: String,
66    pub message_type: Option<String>,
67    pub publishers: Vec<String>,
68    pub subscribers: Vec<String>,
69}
70
71/// Topology service for static analysis
72pub struct TopologyService {
73    project_root: PathBuf,
74}
75
76impl TopologyService {
77    /// Create a new topology service
78    pub fn new(project_root: PathBuf) -> Self {
79        Self { project_root }
80    }
81
82    /// Analyze project topology
83    pub async fn analyze(&self) -> Result<Topology> {
84        // Load project configuration
85        let config_path = self.project_root.join(paths::PROJECT_CONFIG);
86        let config = ConfigService::load_from(&config_path).await?;
87
88        // Parse Redis URL (derived from environments config)
89        let redis = self.parse_redis_url(&config.environments.redis_url())?;
90
91        // Extract service ports
92        let services = self.extract_services(&config);
93
94        // Analyze all enabled nodes
95        let nodes = self.analyze_nodes(&config).await?;
96
97        // Build topic-centric view
98        let topics = self.build_topic_view(&nodes);
99
100        Ok(Topology {
101            project_name: config.name,
102            redis,
103            services,
104            nodes,
105            topics,
106        })
107    }
108
109    /// Parse Redis URL into components
110    ///
111    /// Note: This method is public primarily for testing purposes.
112    pub fn parse_redis_url(&self, url: &str) -> Result<RedisInfo> {
113        // Handle redis://host:port format
114        let url_pattern = Regex::new(r"redis://([^:]+):(\d+)")?;
115
116        if let Some(caps) = url_pattern.captures(url) {
117            let host = caps.get(1).map(|m| m.as_str()).unwrap_or("localhost");
118            let port = caps.get(2).and_then(|m| m.as_str().parse::<u16>().ok()).unwrap_or(6379);
119
120            Ok(RedisInfo {
121                url: url.to_string(),
122                host: host.to_string(),
123                port,
124            })
125        } else {
126            // Default fallback
127            Ok(RedisInfo {
128                url: url.to_string(),
129                host: "localhost".to_string(),
130                port: 6379,
131            })
132        }
133    }
134
135    /// Extract service port information from config
136    fn extract_services(&self, config: &ProjectConfig) -> Vec<ServiceInfo> {
137        let mut services = Vec::new();
138
139        // HTTP API service
140        if let Some(http_api) = &config.services.http_api {
141            services.push(ServiceInfo {
142                name: "HTTP API".to_string(),
143                host: http_api.host.clone(),
144                port: http_api.port,
145            });
146        }
147
148        // Database service
149        if let Some(db) = &config.services.database {
150            // Try to parse postgres://host:port or similar
151            if let Some(port) = self.extract_port_from_url(&db.url) {
152                services.push(ServiceInfo {
153                    name: "Database".to_string(),
154                    host: self
155                        .extract_host_from_url(&db.url)
156                        .unwrap_or_else(|| "localhost".to_string()),
157                    port,
158                });
159            }
160        }
161
162        services
163    }
164
165    /// Extract host from a URL string
166    fn extract_host_from_url(&self, url: &str) -> Option<String> {
167        let re = Regex::new(r"://([^:/@]+)").ok()?;
168        re.captures(url)
169            .and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
170    }
171
172    /// Extract port from a URL string
173    fn extract_port_from_url(&self, url: &str) -> Option<u16> {
174        let re = Regex::new(r":(\d+)").ok()?;
175        re.captures(url)
176            .and_then(|caps| caps.get(1))
177            .and_then(|m| m.as_str().parse().ok())
178    }
179
180    /// Analyze all nodes
181    async fn analyze_nodes(&self, config: &ProjectConfig) -> Result<Vec<NodeTopology>> {
182        let mut nodes = Vec::new();
183
184        // Analyze all nodes from config
185        for spec in config.nodes.get_node_specs() {
186            let topology = self.analyze_node(&spec.name, &spec.package_path(), None).await?;
187            nodes.push(topology);
188        }
189
190        Ok(nodes)
191    }
192
193    /// Analyze a single node's configuration for topics
194    async fn analyze_node(&self, name: &str, package_path: &str, description: Option<&str>) -> Result<NodeTopology> {
195        // Try to load node config first
196        let (publishes, subscribes) = self.load_topics_from_config(name).await.unwrap_or_else(|_| {
197            // Fallback: try source parsing if config loading fails
198            let source_path = match self.resolve_node_source(package_path) {
199                Ok(path) if path.exists() => path,
200                _ => return (Vec::new(), Vec::new()),
201            };
202
203            // Parse source file for topics (blocking operation in async context)
204            match tokio::task::block_in_place(|| {
205                let rt = tokio::runtime::Handle::current();
206                rt.block_on(self.parse_node_source(&source_path))
207            }) {
208                Ok((pubs, subs)) => (pubs, subs),
209                Err(_) => (Vec::new(), Vec::new()),
210            }
211        });
212
213        Ok(NodeTopology {
214            name: name.to_string(),
215            package: package_path.to_string(),
216            enabled: true,
217            description: description.map(String::from),
218            publishes,
219            subscribes,
220        })
221    }
222
223    /// Load topic information from node config file
224    ///
225    /// Supports multiple config formats:
226    ///
227    /// **Format 1: Array format (recommended)**
228    /// ```json
229    /// {
230    ///   "topics": {
231    ///     "publishes": [
232    ///       { "output": "/vision/classification" },
233    ///       { "status": "/motor/status" }
234    ///     ],
235    ///     "subscribes": [
236    ///       { "input": "/camera/rgb" }
237    ///     ]
238    ///   }
239    /// }
240    /// ```
241    ///
242    /// **Format 2: Flat format (for nodes with many semantic topics)**
243    /// ```json
244    /// {
245    ///   "topics": {
246    ///     "command_in": "/ai/command",
247    ///     "response_out": "/ai/response",
248    ///     "camera_in": "/camera/rgb"
249    ///   }
250    /// }
251    /// ```
252    /// Field names ending in `_in` or `_sub` are subscribes, `_out` or `_pub` are publishes.
253    ///
254    /// **Format 3: Root-level fields (legacy)**
255    /// ```json
256    /// {
257    ///   "input_topic": "/camera/rgb",
258    ///   "output_topic": "/inference/detections",
259    ///   "control_topic": "/inference/cmd"
260    /// }
261    /// ```
262    async fn load_topics_from_config(&self, node_name: &str) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
263        // Try multiple config locations (new format with dev/production sections):
264        // 1. User project: configs/nodes/@mecha10/{node_name}/config.json
265        // 2. User project: configs/nodes/@local/{node_name}/config.json
266        // 3. Framework monorepo: packages/nodes/{node_name}/configs/config.json
267        let mecha10_config_path = self
268            .project_root
269            .join("configs/nodes/@mecha10")
270            .join(node_name)
271            .join("config.json");
272
273        let local_config_path = self
274            .project_root
275            .join("configs/nodes/@local")
276            .join(node_name)
277            .join("config.json");
278
279        let framework_config_path = self
280            .project_root
281            .join(paths::framework::NODES_DIR)
282            .join(node_name)
283            .join("configs/config.json");
284
285        let config_path = if mecha10_config_path.exists() {
286            mecha10_config_path
287        } else if local_config_path.exists() {
288            local_config_path
289        } else if framework_config_path.exists() {
290            framework_config_path
291        } else {
292            anyhow::bail!(
293                "Config file not found at {}, {}, or {}",
294                mecha10_config_path.display(),
295                local_config_path.display(),
296                framework_config_path.display()
297            );
298        };
299
300        // Load config as JSON
301        let content = tokio::fs::read_to_string(&config_path).await?;
302        let config: serde_json::Value = serde_json::from_str(&content)?;
303
304        // Extract environment-specific config (default to dev)
305        let profile = std::env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "dev".to_string());
306        let env_config = if config.get("dev").is_some() || config.get("production").is_some() {
307            // New format: { "dev": {...}, "production": {...} }
308            let section = match profile.as_str() {
309                "production" | "prod" => config.get("production"),
310                _ => config.get("dev"),
311            };
312            section.cloned().unwrap_or(config)
313        } else {
314            // Legacy format: direct config object
315            config
316        };
317
318        // Use the helper method to parse topics from JSON
319        Ok(self.parse_topics_from_json(&env_config))
320    }
321
322    /// Resolve node source file path
323    fn resolve_node_source(&self, package_path: &str) -> Result<PathBuf> {
324        // Handle different path formats:
325        // 1. Relative path: "./nodes/camera" or "./drivers/motor"
326        // 2. Package name: "mecha10-nodes-camera"
327        // 3. Framework package: "@mecha10/camera-node"
328
329        if package_path.starts_with("./") || package_path.starts_with("../") {
330            // Relative path
331            let path = self.project_root.join(package_path).join("src/lib.rs");
332            Ok(path)
333        } else if package_path.starts_with("@mecha10/") || package_path.starts_with("mecha10-") {
334            // Framework package - look in workspace packages
335            let package_name = package_path
336                .strip_prefix("@mecha10/")
337                .or_else(|| package_path.strip_prefix("mecha10-"))
338                .unwrap_or(package_path);
339
340            // Strip "nodes-" or "drivers-" prefix if present (e.g., "mecha10-nodes-speaker" -> "speaker")
341            let package_name = package_name
342                .strip_prefix("nodes-")
343                .or_else(|| package_name.strip_prefix("drivers-"))
344                .unwrap_or(package_name);
345
346            // Try to find framework root (MECHA10_FRAMEWORK_PATH or walk up to find workspace)
347            let framework_root = self.find_framework_root()?;
348
349            // Try packages/nodes/{name}/src/lib.rs
350            let nodes_path = framework_root
351                .join(paths::framework::NODES_DIR)
352                .join(package_name)
353                .join("src/lib.rs");
354            if nodes_path.exists() {
355                return Ok(nodes_path);
356            }
357
358            // Try packages/drivers/{name}/src/lib.rs
359            let drivers_path = framework_root
360                .join(paths::framework::DRIVERS_DIR)
361                .join(package_name)
362                .join("src/lib.rs");
363            if drivers_path.exists() {
364                return Ok(drivers_path);
365            }
366
367            // Try packages/services/{name}/src/lib.rs
368            let services_path = framework_root
369                .join(paths::framework::SERVICES_DIR)
370                .join(package_name)
371                .join("src/lib.rs");
372            if services_path.exists() {
373                return Ok(services_path);
374            }
375
376            // Not found, return the first attempt
377            Ok(nodes_path)
378        } else {
379            // Assume it's a package name
380            Ok(self
381                .project_root
382                .join(paths::project::NODES_DIR)
383                .join(package_path)
384                .join("src/lib.rs"))
385        }
386    }
387
388    /// Find the mecha10 framework root directory
389    fn find_framework_root(&self) -> Result<PathBuf> {
390        // First try MECHA10_FRAMEWORK_PATH environment variable
391        if let Ok(framework_path) = std::env::var("MECHA10_FRAMEWORK_PATH") {
392            let path = PathBuf::from(framework_path);
393            if path.exists() {
394                return Ok(path);
395            }
396        }
397
398        // Check if we're already in the framework root (has packages/nodes directory)
399        if self.project_root.join(paths::framework::NODES_DIR).exists() {
400            return Ok(self.project_root.clone());
401        }
402
403        // Walk up from project root to find workspace with packages/nodes
404        let mut current = self.project_root.clone();
405        loop {
406            if current.join(paths::framework::NODES_DIR).exists() {
407                return Ok(current);
408            }
409
410            match current.parent() {
411                Some(parent) => current = parent.to_path_buf(),
412                None => break,
413            }
414        }
415
416        // Fallback to project root
417        Ok(self.project_root.clone())
418    }
419
420    /// Parse node source file for topic definitions and usage
421    async fn parse_node_source(&self, source_path: &Path) -> Result<(Vec<TopicRef>, Vec<TopicRef>)> {
422        let content = tokio::fs::read_to_string(source_path)
423            .await
424            .context(format!("Failed to read source file: {}", source_path.display()))?;
425
426        // Extract topic constants and their types
427        let topic_defs = self.extract_topic_definitions(&content);
428
429        // Find publish calls
430        let publishes = self.extract_publish_calls(&content, &topic_defs);
431
432        // Find subscribe calls
433        let subscribes = self.extract_subscribe_calls(&content, &topic_defs);
434
435        Ok((publishes, subscribes))
436    }
437
438    /// Extract topic constant definitions from source
439    ///
440    /// Note: This method is public primarily for testing purposes.
441    pub fn extract_topic_definitions(&self, content: &str) -> HashMap<String, TopicRef> {
442        let mut topics = HashMap::new();
443
444        // Match: pub const TOPIC_NAME: Topic<MessageType> = Topic::new("/path");
445        let topic_pattern =
446            Regex::new(r#"pub\s+const\s+([A-Z_]+):\s*Topic<([^>]+)>\s*=\s*Topic::new\("([^"]+)"\)"#).unwrap();
447
448        for caps in topic_pattern.captures_iter(content) {
449            let const_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
450            let message_type = caps.get(2).map(|m| m.as_str().trim()).unwrap_or("");
451            let topic_path = caps.get(3).map(|m| m.as_str()).unwrap_or("");
452
453            topics.insert(
454                const_name.to_string(),
455                TopicRef {
456                    path: topic_path.to_string(),
457                    message_type: Some(message_type.to_string()),
458                },
459            );
460        }
461
462        topics
463    }
464
465    /// Extract publish_to calls
466    ///
467    /// Note: This method is public primarily for testing purposes.
468    pub fn extract_publish_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
469        let mut publishes = Vec::new();
470        let mut seen = HashSet::new();
471
472        // Match: ctx.publish_to(TOPIC_NAME, ...)
473        // or: publish_to(TOPIC_NAME, ...)
474        // or: ctx.publish_to(topics::TOPIC_NAME, ...)
475        // Use a more flexible pattern that handles whitespace and optional module prefix
476        let publish_pattern = Regex::new(r"publish_to\s*\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*,").unwrap();
477
478        for caps in publish_pattern.captures_iter(content) {
479            if let Some(const_name) = caps.get(1) {
480                let const_name = const_name.as_str();
481                if let Some(topic) = topic_defs.get(const_name) {
482                    if seen.insert(topic.path.clone()) {
483                        publishes.push(topic.clone());
484                    }
485                }
486            }
487        }
488
489        publishes
490    }
491
492    /// Extract subscribe calls
493    ///
494    /// Note: This method is public primarily for testing purposes.
495    pub fn extract_subscribe_calls(&self, content: &str, topic_defs: &HashMap<String, TopicRef>) -> Vec<TopicRef> {
496        let mut subscribes = Vec::new();
497        let mut seen = HashSet::new();
498
499        // Match: ctx.subscribe::<MessageType>(TOPIC_NAME)
500        // or: subscribe(TOPIC_NAME)
501        // or: ctx.subscribe::<MessageType>(topics::TOPIC_NAME)
502        // Use a more flexible pattern that handles whitespace, generics, and optional module prefix
503        let subscribe_pattern =
504            Regex::new(r"subscribe\s*(?:::\s*<[^>]+>\s*)?\(\s*(?:[a-z_]+::)?([A-Z_][A-Z0-9_]*)\s*\)").unwrap();
505
506        for caps in subscribe_pattern.captures_iter(content) {
507            if let Some(const_name) = caps.get(1) {
508                let const_name = const_name.as_str();
509                if let Some(topic) = topic_defs.get(const_name) {
510                    if seen.insert(topic.path.clone()) {
511                        subscribes.push(topic.clone());
512                    }
513                }
514            }
515        }
516
517        subscribes
518    }
519
520    /// Build topic-centric view from node-centric data
521    fn build_topic_view(&self, nodes: &[NodeTopology]) -> Vec<TopicTopology> {
522        let mut topic_map: HashMap<String, TopicTopology> = HashMap::new();
523
524        for node in nodes {
525            // Add publishers
526            for topic_ref in &node.publishes {
527                let topic = topic_map
528                    .entry(topic_ref.path.clone())
529                    .or_insert_with(|| TopicTopology {
530                        path: topic_ref.path.clone(),
531                        message_type: topic_ref.message_type.clone(),
532                        publishers: Vec::new(),
533                        subscribers: Vec::new(),
534                    });
535                if !topic.publishers.contains(&node.name) {
536                    topic.publishers.push(node.name.clone());
537                }
538            }
539
540            // Add subscribers
541            for topic_ref in &node.subscribes {
542                let topic = topic_map
543                    .entry(topic_ref.path.clone())
544                    .or_insert_with(|| TopicTopology {
545                        path: topic_ref.path.clone(),
546                        message_type: topic_ref.message_type.clone(),
547                        publishers: Vec::new(),
548                        subscribers: Vec::new(),
549                    });
550                if !topic.subscribers.contains(&node.name) {
551                    topic.subscribers.push(node.name.clone());
552                }
553            }
554        }
555
556        let mut topics: Vec<TopicTopology> = topic_map.into_values().collect();
557        topics.sort_by(|a, b| a.path.cmp(&b.path));
558        topics
559    }
560
561    /// Parse topics from JSON config value
562    ///
563    /// This is a helper method extracted for testing. It contains the core parsing logic
564    /// from `load_topics_from_config` but operates on a JSON value instead of a file path.
565    fn parse_topics_from_json(&self, config: &serde_json::Value) -> (Vec<TopicRef>, Vec<TopicRef>) {
566        let mut publishes = Vec::new();
567        let mut subscribes = Vec::new();
568
569        // Try Format 1: Array format (topics.publishes/topics.subscribes)
570        if let Some(topics) = config.get("topics").and_then(|v| v.as_object()) {
571            // Check if using array format
572            if topics.contains_key("publishes") || topics.contains_key("subscribes") {
573                // Format 1: Array format
574                if let Some(pubs) = topics.get("publishes").and_then(|v| v.as_array()) {
575                    for topic in pubs {
576                        if let Some(topic_obj) = topic.as_object() {
577                            for (_semantic_name, topic_path) in topic_obj {
578                                if let Some(path_str) = topic_path.as_str() {
579                                    publishes.push(TopicRef {
580                                        path: path_str.to_string(),
581                                        message_type: None,
582                                    });
583                                }
584                            }
585                        }
586                    }
587                }
588
589                if let Some(subs) = topics.get("subscribes").and_then(|v| v.as_array()) {
590                    for topic in subs {
591                        if let Some(topic_obj) = topic.as_object() {
592                            for (_semantic_name, topic_path) in topic_obj {
593                                if let Some(path_str) = topic_path.as_str() {
594                                    subscribes.push(TopicRef {
595                                        path: path_str.to_string(),
596                                        message_type: None,
597                                    });
598                                }
599                            }
600                        }
601                    }
602                }
603            } else {
604                // Format 2: Flat format - infer pub/sub from field names
605                for (field_name, topic_path) in topics {
606                    if let Some(path_str) = topic_path.as_str() {
607                        // Classify based on field name suffix
608                        if field_name.ends_with("_in") || field_name.ends_with("_sub") || field_name.contains("input") {
609                            subscribes.push(TopicRef {
610                                path: path_str.to_string(),
611                                message_type: None,
612                            });
613                        } else if field_name.ends_with("_out")
614                            || field_name.ends_with("_pub")
615                            || field_name.contains("output")
616                        {
617                            publishes.push(TopicRef {
618                                path: path_str.to_string(),
619                                message_type: None,
620                            });
621                        }
622                        // If field name doesn't match patterns, skip it (might be control/status)
623                    }
624                }
625            }
626        }
627
628        // Try Format 3: Root-level topic fields (fallback)
629        if publishes.is_empty() && subscribes.is_empty() {
630            // Check for input_topic, output_topic, control_topic pattern
631            if let Some(input) = config.get("input_topic").and_then(|v| v.as_str()) {
632                subscribes.push(TopicRef {
633                    path: input.to_string(),
634                    message_type: None,
635                });
636            }
637            if let Some(output) = config.get("output_topic").and_then(|v| v.as_str()) {
638                publishes.push(TopicRef {
639                    path: output.to_string(),
640                    message_type: None,
641                });
642            }
643            if let Some(control) = config.get("control_topic").and_then(|v| v.as_str()) {
644                subscribes.push(TopicRef {
645                    path: control.to_string(),
646                    message_type: None,
647                });
648            }
649        }
650
651        (publishes, subscribes)
652    }
653}
654
655#[cfg(test)]
656mod tests {
657    use super::*;
658    use std::path::PathBuf;
659
660    fn create_test_service() -> TopologyService {
661        TopologyService::new(PathBuf::from("/tmp/test"))
662    }
663
664    #[test]
665    fn test_parse_array_format() {
666        let service = create_test_service();
667        let config: serde_json::Value = serde_json::json!({
668            "topics": {
669                "publishes": [
670                    { "output": "/vision/classification" },
671                    { "status": "/motor/status" }
672                ],
673                "subscribes": [
674                    { "input": "/camera/rgb" }
675                ]
676            }
677        });
678
679        let (publishes, subscribes) = service.parse_topics_from_json(&config);
680
681        assert_eq!(publishes.len(), 2);
682        assert_eq!(subscribes.len(), 1);
683        assert_eq!(publishes[0].path, "/vision/classification");
684        assert_eq!(publishes[1].path, "/motor/status");
685        assert_eq!(subscribes[0].path, "/camera/rgb");
686    }
687
688    #[test]
689    fn test_parse_flat_format() {
690        let service = create_test_service();
691        let config: serde_json::Value = serde_json::json!({
692            "topics": {
693                "command_in": "/ai/command",
694                "response_out": "/ai/response",
695                "camera_in": "/camera/rgb",
696                "nav_goal_out": "/nav/goal",
697                "motor_cmd_out": "/motor/cmd_vel",
698                "behavior_out": "/behavior/execute"
699            }
700        });
701
702        let (publishes, subscribes) = service.parse_topics_from_json(&config);
703
704        // Should have 4 publishes (_out suffix)
705        assert_eq!(publishes.len(), 4);
706        assert!(publishes.iter().any(|t| t.path == "/ai/response"));
707        assert!(publishes.iter().any(|t| t.path == "/nav/goal"));
708        assert!(publishes.iter().any(|t| t.path == "/motor/cmd_vel"));
709        assert!(publishes.iter().any(|t| t.path == "/behavior/execute"));
710
711        // Should have 2 subscribes (_in suffix)
712        assert_eq!(subscribes.len(), 2);
713        assert!(subscribes.iter().any(|t| t.path == "/ai/command"));
714        assert!(subscribes.iter().any(|t| t.path == "/camera/rgb"));
715    }
716
717    #[test]
718    fn test_parse_root_level_format() {
719        let service = create_test_service();
720        let config: serde_json::Value = serde_json::json!({
721            "input_topic": "/robot/sensors/camera/rgb",
722            "output_topic": "/inference/detections",
723            "control_topic": "/inference/cmd"
724        });
725
726        let (publishes, subscribes) = service.parse_topics_from_json(&config);
727
728        assert_eq!(publishes.len(), 1);
729        assert_eq!(subscribes.len(), 2);
730        assert_eq!(publishes[0].path, "/inference/detections");
731        assert!(subscribes.iter().any(|t| t.path == "/robot/sensors/camera/rgb"));
732        assert!(subscribes.iter().any(|t| t.path == "/inference/cmd"));
733    }
734}