mecha10_core/context/
mod.rs

1//! Node execution context
2//!
3//! Provides a high-level API for nodes to interact with the framework,
4//! including type-safe pub/sub, configuration access, and lifecycle management.
5//!
6//! This module has been refactored into smaller sub-modules for better organization:
7//! - `infra_config`: Infrastructure configuration types
8//! - `receiver`: Message receiver wrapper
9
10mod builder;
11mod infra_config;
12mod receiver;
13
14pub use builder::ContextBuilder;
15pub use infra_config::{BridgeConfig, InfrastructureConfig, MinioConfig, MlflowConfig, PostgresConfig, RedisConfig};
16pub use receiver::Receiver;
17
18use crate::error::{Mecha10Error, Result};
19use crate::messages::Message;
20use crate::state::FilesystemStateManager;
21use crate::topics::Topic;
22use anyhow::Context as _;
23use serde::{de::DeserializeOwned, Serialize};
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27// Import the complete Context struct and all implementations from the backup file
28// This maintains backward compatibility while improving code organization
29
30pub struct Context {
31    /// Node identifier (includes instance if set: "node_name" or "node_name/instance")
32    node_id: String,
33
34    /// Optional instance name
35    instance: Option<String>,
36
37    /// Local message bus for on-robot communication (wrapped in Arc for sharing across tasks)
38    #[cfg(feature = "messaging")]
39    message_bus: Arc<RwLock<mecha10_messaging::MessageBus>>,
40
41    /// Control plane message bus for robot-to-cloud communication (optional)
42    #[cfg(feature = "messaging")]
43    control_plane_bus: Arc<RwLock<Option<mecha10_messaging::MessageBus>>>,
44
45    /// Redis bridge for forwarding messages between local and control plane
46    #[cfg(feature = "messaging")]
47    redis_bridge: Arc<RwLock<Option<crate::redis_bridge::RedisBridge>>>,
48
49    /// Shutdown signal
50    shutdown: Arc<RwLock<bool>>,
51
52    /// State manager for persistent node state (lazy initialized)
53    state_manager: Arc<RwLock<Option<Arc<crate::state::ConcreteStateManager>>>>,
54}
55impl Clone for Context {
56    fn clone(&self) -> Self {
57        Self {
58            node_id: self.node_id.clone(),
59            instance: self.instance.clone(),
60            #[cfg(feature = "messaging")]
61            message_bus: Arc::clone(&self.message_bus),
62            #[cfg(feature = "messaging")]
63            control_plane_bus: Arc::clone(&self.control_plane_bus),
64            #[cfg(feature = "messaging")]
65            redis_bridge: Arc::clone(&self.redis_bridge),
66            shutdown: Arc::clone(&self.shutdown),
67            state_manager: Arc::clone(&self.state_manager),
68        }
69    }
70}
71impl Context {
72    /// Create a new context
73    ///
74    /// # Arguments
75    ///
76    /// * `node_id` - Unique identifier for this node
77    ///
78    /// # Example
79    ///
80    /// ```rust
81    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
82    /// let ctx = Context::new("my-node").await?;
83    /// # Ok(())
84    /// # }
85    /// ```
86    pub async fn new(node_id: &str) -> Result<Self> {
87        #[cfg(feature = "messaging")]
88        let (message_bus, control_plane_bus, redis_bridge) = {
89            // Load infrastructure config
90            let config = Self::load_infrastructure_config()?;
91
92            // Connect to local Redis
93            let redis_url = Self::get_redis_url_from_config(&config)?;
94            let local_bus = mecha10_messaging::MessageBus::connect(&redis_url, node_id)
95                .await
96                .with_context(|| {
97                    format!(
98                        "Failed to connect local message bus for node '{}' to Redis at '{}'",
99                        node_id, redis_url
100                    )
101                })
102                .map_err(|e| Mecha10Error::MessagingError {
103                    message: format!("{:#}", e),
104                    suggestion: "Ensure local Redis is running at the configured URL".to_string(),
105                })?;
106
107            // Connect to control plane Redis if configured
108            let cp_bus = if let Some(cp_config) = &config.control_plane_redis {
109                let cp_url = &cp_config.url;
110                match mecha10_messaging::MessageBus::connect(cp_url, node_id).await {
111                    Ok(bus) => {
112                        tracing::info!(
113                            "Connected to control plane Redis at '{}' for node '{}'",
114                            cp_url,
115                            node_id
116                        );
117                        Some(bus)
118                    }
119                    Err(e) => {
120                        tracing::warn!(
121                            "Failed to connect to control plane Redis at '{}': {}. Continuing with local Redis only.",
122                            cp_url,
123                            e
124                        );
125                        None
126                    }
127                }
128            } else {
129                None
130            };
131
132            // Initialize Redis bridge if both connections are available
133            let bridge = if let Some(_cp_bus_ref) = &cp_bus {
134                // Get bridge configuration (use default if not specified)
135                let bridge_config = config.bridge.unwrap_or_default();
136
137                if bridge_config.enabled {
138                    // Create bridge with separate clones of the buses
139                    let bridge_local_bus = mecha10_messaging::MessageBus::connect(&redis_url, node_id)
140                        .await
141                        .map_err(|e| Mecha10Error::MessagingError {
142                            message: format!("Failed to create bridge connection to local Redis: {}", e),
143                            suggestion: "Check local Redis connection".to_string(),
144                        })?;
145
146                    let cp_url = config
147                        .control_plane_redis
148                        .as_ref()
149                        .map(|c| c.url.clone())
150                        .unwrap_or_default();
151                    let bridge_cp_bus =
152                        mecha10_messaging::MessageBus::connect(&cp_url, node_id)
153                            .await
154                            .map_err(|e| Mecha10Error::MessagingError {
155                                message: format!("Failed to create bridge connection to control plane Redis: {}", e),
156                                suggestion: "Check control plane Redis connection".to_string(),
157                            })?;
158
159                    // Convert from infra_config::BridgeConfig to redis_bridge::BridgeConfig
160                    let redis_bridge_config = crate::redis_bridge::BridgeConfig {
161                        enabled: bridge_config.enabled,
162                        local_topics: bridge_config.local_topics,
163                        bridged_topics: bridge_config.bridged_topics,
164                        robot_id: bridge_config.robot_id,
165                    };
166
167                    let bridge =
168                        crate::redis_bridge::RedisBridge::new(redis_bridge_config, bridge_local_bus, bridge_cp_bus);
169
170                    // Start the bridge
171                    if let Err(e) = bridge.start().await {
172                        tracing::warn!("Failed to start Redis bridge: {}. Continuing without bridge.", e);
173                        None
174                    } else {
175                        tracing::info!("Redis bridge started successfully");
176                        Some(bridge)
177                    }
178                } else {
179                    tracing::debug!("Redis bridge is disabled in configuration");
180                    None
181                }
182            } else {
183                None
184            };
185
186            (local_bus, cp_bus, bridge)
187        };
188
189        Ok(Self {
190            node_id: node_id.to_string(),
191            instance: None,
192            #[cfg(feature = "messaging")]
193            message_bus: Arc::new(RwLock::new(message_bus)),
194            #[cfg(feature = "messaging")]
195            control_plane_bus: Arc::new(RwLock::new(control_plane_bus)),
196            #[cfg(feature = "messaging")]
197            redis_bridge: Arc::new(RwLock::new(redis_bridge)),
198            shutdown: Arc::new(RwLock::new(false)),
199            state_manager: Arc::new(RwLock::new(None)),
200        })
201    }
202
203    /// Set an instance name for this context (builder pattern)
204    ///
205    /// This allows the same node implementation to run multiple instances
206    /// with different names. The node_id becomes "{node_name}/{instance}".
207    ///
208    /// # Example
209    ///
210    /// ```rust
211    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
212    /// // Single instance (no instance name)
213    /// let ctx = Context::new("camera_fake").await?;
214    /// // node_id: "camera_fake"
215    ///
216    /// // Multi-instance (with instance names)
217    /// let left_ctx = Context::new("camera_fake").await?.with_instance("left");
218    /// // node_id: "camera_fake/left"
219    ///
220    /// let right_ctx = Context::new("camera_fake").await?.with_instance("right");
221    /// // node_id: "camera_fake/right"
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub fn with_instance(mut self, instance: &str) -> Self {
226        self.instance = Some(instance.to_string());
227        self.node_id = format!("{}/{}", self.node_id, instance);
228        self
229    }
230
231    /// Get the instance name if set
232    ///
233    /// Returns `None` for single-instance nodes, or `Some(instance)` for
234    /// multi-instance nodes.
235    pub fn instance(&self) -> Option<&str> {
236        self.instance.as_deref()
237    }
238
239    /// Load infrastructure configuration based on environment
240    ///
241    /// Priority order:
242    /// 1. mecha10.json (project configuration) - redis.url field
243    /// 2. Environment variables (MECHA10_REDIS_URL or REDIS_URL)
244    /// 3. config/infrastructure.<environment>.yaml
245    ///
246    /// Set via MECHA10_ENVIRONMENT environment variable.
247    fn load_infrastructure_config() -> Result<InfrastructureConfig> {
248        use crate::config::load_config;
249        use std::env;
250        use std::path::PathBuf;
251
252        // 1. Try loading from mecha10.json FIRST (project configuration)
253        if let Ok(redis_url) = Self::try_load_redis_from_mecha10_json() {
254            return Ok(InfrastructureConfig {
255                redis: crate::context::infra_config::RedisConfig {
256                    url: redis_url,
257                    max_connections: 10,
258                    connection_timeout_ms: 5000,
259                },
260                control_plane_redis: None,
261                bridge: None,
262                postgres: None,
263                mlflow: None,
264                minio: None,
265            });
266        }
267
268        // 2. Check for environment variable overrides
269        // If REDIS_URL or MECHA10_REDIS_URL is set, use that and create minimal config
270        if let Ok(redis_url) = env::var("MECHA10_REDIS_URL").or_else(|_| env::var("REDIS_URL")) {
271            return Ok(InfrastructureConfig {
272                redis: crate::context::infra_config::RedisConfig {
273                    url: redis_url,
274                    max_connections: 10,
275                    connection_timeout_ms: 5000,
276                },
277                control_plane_redis: None,
278                bridge: None,
279                postgres: None,
280                mlflow: None,
281                minio: None,
282            });
283        }
284
285        // 3. Load from infrastructure.yaml
286        let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
287
288        let config_filename = format!("infrastructure.{}.yaml", environment);
289
290        // Try multiple paths in order:
291        // 1. Current directory (for running from workspace root)
292        // 2. Workspace root (for running tests from target/debug/deps)
293        // 3. Two levels up (for running tests from packages/core/target/debug/deps)
294        let paths_to_try = vec![
295            PathBuf::from(format!("config/{}", config_filename)),
296            PathBuf::from(format!("../../config/{}", config_filename)),
297            PathBuf::from(format!("../../../../config/{}", config_filename)),
298        ];
299
300        for path in &paths_to_try {
301            if path.exists() {
302                return load_config(path)
303                    .with_context(|| {
304                        format!(
305                            "Failed to load infrastructure config for environment '{}' from '{}'",
306                            environment,
307                            path.display()
308                        )
309                    })
310                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
311            }
312        }
313
314        // If no config file found, return error with helpful message
315        Err(Mecha10Error::Configuration(format!(
316            "Failed to load infrastructure config for environment '{}'. \
317             Tried paths: {}. \
318             Make sure the file exists, set MECHA10_REDIS_URL environment variable, \
319             or configure redis.url in mecha10.json.",
320            environment,
321            paths_to_try
322                .iter()
323                .map(|p| p.display().to_string())
324                .collect::<Vec<_>>()
325                .join(", ")
326        )))
327    }
328
329    /// Try to load Redis URL from mecha10.json
330    ///
331    /// Looks for mecha10.json in current directory and reads redis.url field.
332    fn try_load_redis_from_mecha10_json() -> Result<String> {
333        use std::path::PathBuf;
334
335        // Try multiple paths to find mecha10.json
336        let paths_to_try = vec![
337            PathBuf::from("mecha10.json"),
338            PathBuf::from("../mecha10.json"),
339            PathBuf::from("../../mecha10.json"),
340        ];
341
342        for path in &paths_to_try {
343            if !path.exists() {
344                continue;
345            }
346
347            // Read and parse mecha10.json
348            let content = std::fs::read_to_string(path)
349                .map_err(|e| Mecha10Error::Configuration(format!("Failed to read {}: {}", path.display(), e)))?;
350
351            let json: serde_json::Value = serde_json::from_str(&content)
352                .map_err(|e| Mecha10Error::Configuration(format!("Failed to parse {}: {}", path.display(), e)))?;
353
354            // Extract redis.url
355            if let Some(redis_url) = json.get("redis").and_then(|r| r.get("url")).and_then(|u| u.as_str()) {
356                return Ok(redis_url.to_string());
357            }
358        }
359
360        Err(Mecha10Error::Configuration(
361            "No mecha10.json found with redis.url configuration".to_string(),
362        ))
363    }
364
365    /// Get Redis URL from infrastructure configuration (helper for internal use)
366    fn get_redis_url_from_config(config: &InfrastructureConfig) -> Result<String> {
367        // First try environment variables (for backward compatibility and testing)
368        if let Ok(url) = std::env::var("MECHA10_REDIS_URL") {
369            return Ok(url);
370        }
371        if let Ok(url) = std::env::var("REDIS_URL") {
372            return Ok(url);
373        }
374
375        // Use config
376        Ok(config.redis.url.clone())
377    }
378
379    /// Get Redis URL from infrastructure configuration
380    pub fn get_redis_url() -> Result<String> {
381        let config = Self::load_infrastructure_config()?;
382        Self::get_redis_url_from_config(&config)
383    }
384
385    /// Get the node ID
386    pub fn node_id(&self) -> &str {
387        &self.node_id
388    }
389
390    /// Load node configuration with environment-aware file-level fallback
391    ///
392    /// This method implements the V2 configuration system with:
393    /// - Environment-specific configs (dev/staging/production)
394    /// - File-level fallback (no key-by-key merging)
395    /// - Automatic path resolution
396    ///
397    /// **Fallback Priority:**
398    /// 1. `configs/{env}/nodes/{node-name}/config.json` (environment-specific)
399    /// 2. `configs/common/nodes/{node-name}/config.json` (common fallback)
400    /// 3. `T::default()` (code default)
401    ///
402    /// **Environment Detection:**
403    /// - Uses `MECHA10_ENVIRONMENT` env var (default: "dev")
404    /// - Supported values: "dev", "staging", "production", or custom
405    ///
406    /// # Type Parameters
407    ///
408    /// * `T` - Configuration type that implements `DeserializeOwned` + `Default`
409    ///
410    /// # Arguments
411    ///
412    /// * `node_name` - Name of the node (e.g., "simulation-bridge", "camera-streamer")
413    ///
414    /// # Returns
415    ///
416    /// The loaded configuration of type `T`, using the first available source
417    /// in the fallback chain.
418    ///
419    /// # Example
420    ///
421    /// ```rust
422    /// use mecha10::prelude::*;
423    /// use serde::{Deserialize, Serialize};
424    ///
425    /// #[derive(Debug, Clone, Deserialize, Serialize, Default)]
426    /// struct SimBridgeConfig {
427    ///     camera_width: u32,
428    ///     camera_height: u32,
429    ///     camera_fps: u32,
430    /// }
431    ///
432    /// # async fn example(ctx: &Context) -> Result<()> {
433    /// // Loads from configs/{env}/nodes/simulation-bridge/config.json
434    /// // Falls back to configs/common/nodes/simulation-bridge/config.json
435    /// // Falls back to SimBridgeConfig::default()
436    /// let config: SimBridgeConfig = ctx.load_node_config("simulation-bridge").await?;
437    /// # Ok(())
438    /// # }
439    /// ```
440    pub async fn load_node_config<T>(&self, node_name: &str) -> Result<T>
441    where
442        T: DeserializeOwned + Default,
443    {
444        use crate::config::load_config;
445        use std::env;
446        use std::path::PathBuf;
447
448        // Get environment from env var (default: "dev")
449        let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "dev".to_string());
450
451        // Try multiple base paths (for running from different locations)
452        let base_paths = vec![
453            PathBuf::from("."),           // Current directory
454            PathBuf::from("../.."),       // From target/debug/deps
455            PathBuf::from("../../../.."), // From packages/*/target/debug/deps
456        ];
457
458        // Build fallback chain for each base path
459        for base_path in &base_paths {
460            // 1. Try environment-specific config
461            let env_config_path = base_path
462                .join("configs")
463                .join(&environment)
464                .join("nodes")
465                .join(node_name)
466                .join("config.json");
467
468            if env_config_path.exists() {
469                tracing::debug!(
470                    "Loading node config for '{}' from environment-specific path: {}",
471                    node_name,
472                    env_config_path.display()
473                );
474                return load_config(&env_config_path)
475                    .with_context(|| {
476                        format!(
477                            "Failed to load node config for '{}' from '{}'",
478                            node_name,
479                            env_config_path.display()
480                        )
481                    })
482                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
483            }
484
485            // 2. Try common config
486            let common_config_path = base_path
487                .join("configs")
488                .join("common")
489                .join("nodes")
490                .join(node_name)
491                .join("config.json");
492
493            if common_config_path.exists() {
494                tracing::debug!(
495                    "Loading node config for '{}' from common path: {}",
496                    node_name,
497                    common_config_path.display()
498                );
499                return load_config(&common_config_path)
500                    .with_context(|| {
501                        format!(
502                            "Failed to load node config for '{}' from '{}'",
503                            node_name,
504                            common_config_path.display()
505                        )
506                    })
507                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
508            }
509
510            // 3. Try new flat structure: configs/nodes/@mecha10/{node_name}/config.json
511            let flat_config_path = base_path
512                .join("configs")
513                .join("nodes")
514                .join("@mecha10")
515                .join(node_name)
516                .join("config.json");
517
518            if flat_config_path.exists() {
519                tracing::debug!(
520                    "Loading node config for '{}' from flat path: {}",
521                    node_name,
522                    flat_config_path.display()
523                );
524                return load_config(&flat_config_path)
525                    .with_context(|| {
526                        format!(
527                            "Failed to load node config for '{}' from '{}'",
528                            node_name,
529                            flat_config_path.display()
530                        )
531                    })
532                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
533            }
534        }
535
536        // 3. Fall back to default
537        tracing::debug!(
538            "No config file found for node '{}', using Default implementation",
539            node_name
540        );
541        Ok(T::default())
542    }
543
544    /// Check if control plane Redis connection is available
545    #[cfg(feature = "messaging")]
546    pub async fn has_control_plane(&self) -> bool {
547        self.control_plane_bus.read().await.is_some()
548    }
549
550    /// Get control plane Redis URL from infrastructure configuration
551    pub fn get_control_plane_redis_url() -> Result<Option<String>> {
552        let config = Self::load_infrastructure_config()?;
553        Ok(config.control_plane_redis.map(|c| c.url))
554    }
555
556    /// Subscribe to a topic with type-safe message handling
557    ///
558    /// # Arguments
559    ///
560    /// * `topic` - Type-safe topic to subscribe to
561    ///
562    /// # Returns
563    ///
564    /// A receiver that will yield messages of type `T`
565    ///
566    /// # Example
567    ///
568    /// ```rust
569    /// use mecha10::prelude::*;
570    /// use mecha10::topics::sensor;
571    ///
572    /// # async fn example(ctx: &Context) -> Result<()> {
573    /// // Type is inferred as Receiver<Image>
574    /// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
575    ///
576    /// while let Some(image) = images.recv().await {
577    ///     println!("Image: {}x{}", image.width, image.height);
578    /// }
579    /// # Ok(())
580    /// # }
581    /// ```
582    #[cfg(feature = "messaging")]
583    pub async fn subscribe<T>(&self, topic: Topic<T>) -> Result<Receiver<T>>
584    where
585        T: Message + DeserializeOwned + Send + 'static,
586    {
587        let mut bus = self.message_bus.write().await;
588
589        // Create a unique consumer group for this node
590        let consumer_group = format!("{}-{}", self.node_id, topic.path().replace('/', "-"));
591
592        let subscriber = bus
593            .subscribe::<T>(topic.path(), &consumer_group)
594            .await
595            .with_context(|| {
596                format!(
597                    "Failed to subscribe to topic '{}' with consumer group '{}' from node '{}'",
598                    topic.path(),
599                    consumer_group,
600                    self.node_id
601                )
602            })
603            .map_err(|e| Mecha10Error::MessagingError {
604                message: format!("{:#}", e),
605                suggestion: format!("Check that Redis is running and topic '{}' is valid", topic.path()),
606            })?;
607
608        // Convert to our Receiver type
609        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
610
611        // Spawn task to forward messages
612        tokio::spawn(async move {
613            let mut sub = subscriber;
614            while let Some(msg) = sub.recv().await {
615                // Auto-acknowledge message before moving payload
616                if let Err(e) = msg.ack().await {
617                    tracing::warn!(
618                        topic = %msg.topic,
619                        message_id = %msg.id,
620                        error = %e,
621                        "Failed to acknowledge message (will be redelivered on reconnect)"
622                    );
623                }
624                // Extract payload and forward
625                if tx.send(msg.payload).is_err() {
626                    break; // Receiver dropped
627                }
628            }
629        });
630
631        Ok(Receiver::from(rx))
632    }
633
634    /// Publish a message to a specific topic
635    ///
636    /// # Arguments
637    ///
638    /// * `topic` - Type-safe topic to publish to
639    /// * `message` - Message to publish (type checked against topic)
640    ///
641    /// # Example
642    ///
643    /// ```rust
644    /// use mecha10::prelude::*;
645    /// use mecha10::topics::sensor;
646    /// use mecha10::messages::Image;
647    ///
648    /// # async fn example(ctx: &Context) -> Result<()> {
649    /// let image = Image {
650    ///     timestamp: now_micros(),
651    ///     width: 640,
652    ///     height: 480,
653    ///     encoding: "rgb8".to_string(),
654    ///     data: vec![],
655    /// };
656    ///
657    /// // Type checked: image must be of type Image
658    /// ctx.publish_to(sensor::CAMERA_RGB, &image).await?;
659    /// # Ok(())
660    /// # }
661    /// ```
662    #[cfg(feature = "messaging")]
663    pub async fn publish_to<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
664    where
665        T: Message + Serialize,
666    {
667        let mut bus = self.message_bus.write().await;
668
669        bus.publish(topic.path(), message)
670            .await
671            .with_context(|| {
672                format!(
673                    "Failed to publish to topic '{}' from node '{}'",
674                    topic.path(),
675                    self.node_id
676                )
677            })
678            .map_err(|e| Mecha10Error::MessagingError {
679                message: format!("{:#}", e),
680                suggestion: "Check that Redis is running and accepting connections".to_string(),
681            })?;
682
683        Ok(())
684    }
685
686    /// Publish a message to a dynamic topic path (for runtime-generated topics like camera streams)
687    ///
688    /// This is useful when the topic path is determined at runtime and cannot be a `&'static str`.
689    ///
690    /// # Arguments
691    /// * `topic_path` - The topic path as a String (e.g., "robot/sensors/camera/front/compressed")
692    /// * `message` - The message to publish
693    pub async fn publish_to_path<T>(&self, topic_path: &str, message: &T) -> Result<()>
694    where
695        T: Message + Serialize,
696    {
697        let mut bus = self.message_bus.write().await;
698
699        bus.publish(topic_path, message)
700            .await
701            .with_context(|| {
702                format!(
703                    "Failed to publish to topic '{}' from node '{}'",
704                    topic_path, self.node_id
705                )
706            })
707            .map_err(|e| Mecha10Error::MessagingError {
708                message: format!("{:#}", e),
709                suggestion: "Check that Redis is running and accepting connections".to_string(),
710            })?;
711
712        Ok(())
713    }
714
715    /// Publish a message to an instance-scoped topic
716    ///
717    /// This automatically appends the context's instance name to the topic path.
718    /// If no instance is set, this behaves like `publish_to()`.
719    ///
720    /// # Arguments
721    ///
722    /// * `topic` - Type-safe topic to publish to
723    /// * `message` - Message to publish (type checked against topic)
724    ///
725    /// # Example
726    ///
727    /// ```rust
728    /// use mecha10::prelude::*;
729    /// use mecha10::topics::sensor;
730    /// use mecha10::messages::Image;
731    ///
732    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
733    /// let ctx = Context::new("camera_fake").await?.with_instance("left");
734    ///
735    /// let image = Image {
736    ///     timestamp: now_micros(),
737    ///     width: 640,
738    ///     height: 480,
739    ///     encoding: "rgb8".to_string(),
740    ///     data: vec![],
741    /// };
742    ///
743    /// // Publishes to "/sensor/camera/rgb/left"
744    /// ctx.publish_to_scoped(sensor::CAMERA_RGB, &image).await?;
745    /// # Ok(())
746    /// # }
747    /// ```
748    #[cfg(feature = "messaging")]
749    pub async fn publish_to_scoped<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
750    where
751        T: Message + Serialize,
752    {
753        let scoped_path = if let Some(instance) = &self.instance {
754            format!("{}/{}", topic.path(), instance)
755        } else {
756            topic.path().to_string()
757        };
758
759        self.publish_raw(&scoped_path, message).await
760    }
761
762    /// Publish a message to a topic with an explicit instance suffix
763    ///
764    /// This allows publishing to a specific instance's topic regardless of
765    /// the context's own instance name.
766    ///
767    /// # Arguments
768    ///
769    /// * `topic` - Type-safe topic to publish to
770    /// * `instance` - Instance name to append to topic path
771    /// * `message` - Message to publish (type checked against topic)
772    ///
773    /// # Example
774    ///
775    /// ```rust
776    /// use mecha10::prelude::*;
777    /// use mecha10::topics::actuator;
778    ///
779    /// # async fn example(ctx: &Context) -> Result<()> {
780    /// let cmd = MotorCommand { velocity: 1.0, torque: 0.0 };
781    ///
782    /// // Send to specific motor instances
783    /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_left", &cmd).await?;
784    /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_right", &cmd).await?;
785    /// # Ok(())
786    /// # }
787    /// ```
788    #[cfg(feature = "messaging")]
789    pub async fn publish_to_instance<T>(&self, topic: Topic<T>, instance: &str, message: &T) -> Result<()>
790    where
791        T: Message + Serialize,
792    {
793        let instance_path = format!("{}/{}", topic.path(), instance);
794        self.publish_raw(&instance_path, message).await
795    }
796
797    /// Subscribe to an instance-scoped topic
798    ///
799    /// This automatically appends the context's instance name to the topic path.
800    /// If no instance is set, this behaves like `subscribe()`.
801    ///
802    /// # Example
803    ///
804    /// ```rust
805    /// use mecha10::prelude::*;
806    /// use mecha10::topics::sensor;
807    ///
808    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
809    /// let ctx = Context::new("vision_node").await?;
810    ///
811    /// // Subscribe to left camera (topic: "/sensor/camera/rgb/left")
812    /// let mut left_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "left").await?;
813    ///
814    /// // Subscribe to right camera (topic: "/sensor/camera/rgb/right")
815    /// let mut right_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "right").await?;
816    /// # Ok(())
817    /// # }
818    /// ```
819    #[cfg(feature = "messaging")]
820    pub async fn subscribe_scoped<T>(&self, topic: Topic<T>, instance: &str) -> Result<Receiver<T>>
821    where
822        T: Message + DeserializeOwned + Send + 'static,
823    {
824        let scoped_path = format!("{}/{}", topic.path(), instance);
825        self.subscribe_raw(&scoped_path).await
826    }
827
828    /// Subscribe to a topic using a raw string path (for RPC and advanced use cases)
829    ///
830    /// # Arguments
831    ///
832    /// * `topic_path` - Raw topic path string
833    ///
834    /// # Returns
835    ///
836    /// A receiver that will yield messages of type `T`
837    ///
838    /// # Note
839    ///
840    /// This is a low-level method. Prefer using `subscribe()` with type-safe topics
841    /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
842    #[cfg(feature = "messaging")]
843    pub async fn subscribe_raw<T>(&self, topic_path: &str) -> Result<Receiver<T>>
844    where
845        T: Message + DeserializeOwned + Send + 'static,
846    {
847        let mut bus = self.message_bus.write().await;
848
849        // Create a unique consumer group for this node
850        let consumer_group = format!("{}-{}", self.node_id, topic_path.replace('/', "-"));
851
852        let subscriber = bus
853            .subscribe::<T>(topic_path, &consumer_group)
854            .await
855            .with_context(|| {
856                format!(
857                    "Failed to subscribe to raw topic '{}' with consumer group '{}' from node '{}'",
858                    topic_path, consumer_group, self.node_id
859                )
860            })
861            .map_err(|e| Mecha10Error::MessagingError {
862                message: format!("{:#}", e),
863                suggestion: format!("Check that Redis is running and topic '{}' is valid", topic_path),
864            })?;
865
866        // Convert to our Receiver type
867        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
868
869        // Spawn task to forward messages
870        tokio::spawn(async move {
871            let mut sub = subscriber;
872            while let Some(msg) = sub.recv().await {
873                // Auto-acknowledge message before moving payload
874                if let Err(e) = msg.ack().await {
875                    tracing::warn!(
876                        topic = %msg.topic,
877                        message_id = %msg.id,
878                        error = %e,
879                        "Failed to acknowledge message (will be redelivered on reconnect)"
880                    );
881                }
882                // Extract payload and forward
883                if tx.send(msg.payload).is_err() {
884                    break; // Receiver dropped
885                }
886            }
887        });
888
889        Ok(Receiver::from(rx))
890    }
891
892    /// Discover topics matching a glob pattern
893    ///
894    /// Queries Redis for all topics matching the given pattern without subscribing.
895    /// Useful for inspecting available topics before subscribing.
896    ///
897    /// # Pattern Syntax
898    ///
899    /// - `*` matches any sequence of characters within a path segment
900    /// - `**` matches any sequence of path segments
901    /// - `{a,b}` matches either a or b (future enhancement)
902    ///
903    /// # Arguments
904    ///
905    /// * `pattern` - Glob pattern to match topics against
906    ///
907    /// # Returns
908    ///
909    /// A vector of topic paths that match the pattern
910    ///
911    /// # Example
912    ///
913    /// ```rust,no_run
914    /// use mecha10::prelude::*;
915    ///
916    /// # async fn example(ctx: &Context) -> Result<()> {
917    /// // Discover all camera topics
918    /// let topics = ctx.discover_topics("/sensor/camera/*").await?;
919    /// println!("Found {} camera topics: {:?}", topics.len(), topics);
920    /// # Ok(())
921    /// # }
922    /// ```
923    #[cfg(feature = "messaging")]
924    pub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>> {
925        use crate::aggregated::pattern::convert_glob_to_redis;
926
927        let bus = self.message_bus.read().await;
928        let redis_pattern = convert_glob_to_redis(pattern);
929
930        bus.discover_topics(&redis_pattern)
931            .await
932            .with_context(|| {
933                format!(
934                    "Failed to discover topics matching pattern '{}' (redis pattern: '{}') from node '{}'",
935                    pattern, redis_pattern, self.node_id
936                )
937            })
938            .map_err(|e| Mecha10Error::MessagingError {
939                message: format!("{:#}", e),
940                suggestion: "Check that Redis is running and the pattern is valid".to_string(),
941            })
942    }
943
944    /// Subscribe to multiple topics matching a glob pattern
945    ///
946    /// Returns an aggregated receiver that multiplexes all matching topics into a single stream.
947    /// Each message is tagged with its source topic path.
948    ///
949    /// # Pattern Syntax
950    ///
951    /// - `*` matches any sequence of characters within a path segment
952    /// - `**` matches any sequence of path segments
953    /// - `{a,b}` matches either a or b (future enhancement)
954    ///
955    /// # Arguments
956    ///
957    /// * `pattern` - Glob pattern to match topics against
958    ///
959    /// # Returns
960    ///
961    /// An `AggregatedReceiver<T>` that yields `(topic_path, message)` tuples
962    ///
963    /// # Examples
964    ///
965    /// ```rust,no_run
966    /// use mecha10::prelude::*;
967    ///
968    /// # async fn example(ctx: &Context) -> Result<()> {
969    /// // Subscribe to all cameras with one call
970    /// let mut cameras = ctx.subscribe_aggregated::<ImageMessage>("/sensor/camera/*").await?;
971    ///
972    /// while let Some((topic, image)) = cameras.recv().await {
973    ///     let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
974    ///     println!("Camera {}: {}x{}", camera_name, image.width, image.height);
975    /// }
976    /// # Ok(())
977    /// # }
978    /// ```
979    #[cfg(feature = "messaging")]
980    pub async fn subscribe_aggregated<T>(&self, pattern: &str) -> Result<crate::aggregated::AggregatedReceiver<T>>
981    where
982        T: Message + DeserializeOwned + Send + 'static,
983    {
984        // 1. Discover all topics matching pattern
985        let matching_topics = self.discover_topics(pattern).await?;
986
987        if matching_topics.is_empty() {
988            return Err(Mecha10Error::MessagingError {
989                message: format!("No topics found matching pattern: {}", pattern),
990                suggestion: "Check that the pattern is correct and topics exist".to_string(),
991            });
992        }
993
994        // 2. Subscribe to each topic
995        let mut receivers = Vec::new();
996        for topic_path in matching_topics {
997            let rx = self.subscribe_raw::<T>(&topic_path).await?;
998            // Extract receiver and wrap in ReceiverType enum
999            let receiver_type = match rx.inner {
1000                crate::context::receiver::ReceiverInner::Unbounded(unbounded) => {
1001                    crate::aggregated::ReceiverType::Unbounded(unbounded)
1002                }
1003                crate::context::receiver::ReceiverInner::Bounded(bounded) => {
1004                    crate::aggregated::ReceiverType::Bounded(bounded)
1005                }
1006            };
1007            receivers.push((topic_path.clone(), receiver_type));
1008        }
1009
1010        // 3. Return aggregated receiver with support for both channel types
1011        Ok(crate::aggregated::AggregatedReceiver::new_mixed(
1012            pattern.to_string(),
1013            receivers,
1014        ))
1015    }
1016
1017    /// Publish a message to a topic using a raw string path (for RPC and advanced use cases)
1018    ///
1019    /// # Arguments
1020    ///
1021    /// * `topic_path` - Raw topic path string
1022    /// * `message` - Message to publish
1023    ///
1024    /// # Note
1025    ///
1026    /// This is a low-level method. Prefer using `publish_to()` with type-safe topics
1027    /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
1028    #[cfg(feature = "messaging")]
1029    pub async fn publish_raw<T>(&self, topic_path: &str, message: &T) -> Result<()>
1030    where
1031        T: Message + Serialize,
1032    {
1033        let mut bus = self.message_bus.write().await;
1034
1035        bus.publish(topic_path, message)
1036            .await
1037            .with_context(|| {
1038                format!(
1039                    "Failed to publish to raw topic '{}' from node '{}'",
1040                    topic_path, self.node_id
1041                )
1042            })
1043            .map_err(|e| Mecha10Error::MessagingError {
1044                message: format!("{:#}", e),
1045                suggestion: "Check that Redis is running and accepting connections".to_string(),
1046            })?;
1047
1048        Ok(())
1049    }
1050
1051    /// Check if shutdown has been requested
1052    pub async fn is_shutdown(&self) -> bool {
1053        *self.shutdown.read().await
1054    }
1055
1056    /// Request shutdown
1057    pub async fn shutdown(&self) {
1058        *self.shutdown.write().await = true;
1059
1060        // Stop Redis bridge if running
1061        #[cfg(feature = "messaging")]
1062        {
1063            let bridge_opt = self.redis_bridge.read().await;
1064            if let Some(bridge) = bridge_opt.as_ref() {
1065                if let Err(e) = bridge.stop().await {
1066                    tracing::warn!("Failed to stop Redis bridge during shutdown: {}", e);
1067                }
1068            }
1069        }
1070    }
1071
1072    /// Wait for shutdown signal
1073    pub async fn wait_for_shutdown(&self) {
1074        loop {
1075            if self.is_shutdown().await {
1076                break;
1077            }
1078            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1079        }
1080    }
1081
1082    /// Enter a logging span with this node's context
1083    ///
1084    /// This returns a guard that, when entered, adds the node_id to all log messages.
1085    /// The span is automatically exited when the guard is dropped.
1086    ///
1087    /// # Example
1088    ///
1089    /// ```rust
1090    /// use mecha10::prelude::*;
1091    ///
1092    /// # async fn example(ctx: &Context) -> Result<()> {
1093    /// let _guard = ctx.logging_span().entered();
1094    /// info!("This message includes node_id automatically");
1095    /// debug!("So does this one");
1096    /// # Ok(())
1097    /// # }
1098    /// ```
1099    pub fn logging_span(&self) -> tracing::Span {
1100        tracing::info_span!("node", node_id = %self.node_id)
1101    }
1102
1103    /// Subscribe to all camera topics
1104    ///
1105    /// Convenience method for `subscribe_aggregated::<Image>("/sensor/camera/*")`
1106    ///
1107    /// # Example
1108    ///
1109    /// ```rust,no_run
1110    /// use mecha10::prelude::*;
1111    ///
1112    /// # async fn example(ctx: &Context) -> Result<()> {
1113    /// let mut cameras = ctx.subscribe_all_cameras().await?;
1114    /// while let Some((topic, image)) = cameras.recv().await {
1115    ///     let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
1116    ///     println!("Camera {}: {}x{}", camera_name, image.width, image.height);
1117    /// }
1118    /// # Ok(())
1119    /// # }
1120    /// ```
1121    #[cfg(feature = "messaging")]
1122    pub async fn subscribe_all_cameras(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::Image>> {
1123        self.subscribe_aggregated("/sensor/camera/*").await
1124    }
1125
1126    /// Subscribe to all sensor topics
1127    ///
1128    /// Subscribes to all sensor data including cameras, LiDAR, IMU, odometry, etc.
1129    ///
1130    /// # Example
1131    ///
1132    /// ```rust,no_run
1133    /// use mecha10::prelude::*;
1134    ///
1135    /// # async fn example(ctx: &Context) -> Result<()> {
1136    /// // Subscribe to all sensors (returns untyped messages)
1137    /// let topics = ctx.discover_topics("/sensor/**").await?;
1138    /// println!("Found {} sensor topics", topics.len());
1139    /// # Ok(())
1140    /// # }
1141    /// ```
1142    #[cfg(feature = "messaging")]
1143    pub async fn discover_all_sensors(&self) -> Result<Vec<String>> {
1144        self.discover_topics("/sensor/**").await
1145    }
1146
1147    /// Subscribe to all LiDAR topics
1148    ///
1149    /// Convenience method for `subscribe_aggregated::<LaserScan>("/sensor/lidar/*")`
1150    #[cfg(feature = "messaging")]
1151    pub async fn subscribe_all_lidars(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::LaserScan>> {
1152        self.subscribe_aggregated("/sensor/lidar/*").await
1153    }
1154
1155    /// Subscribe to all IMU topics
1156    ///
1157    /// Convenience method for `subscribe_aggregated::<IMU>("/sensor/imu/*")`
1158    #[cfg(feature = "messaging")]
1159    pub async fn subscribe_all_imus(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::IMU>> {
1160        self.subscribe_aggregated("/sensor/imu/*").await
1161    }
1162
1163    /// Subscribe to all motor topics
1164    ///
1165    /// Convenience method for `subscribe_aggregated::<MotorStatus>("/actuator/motor/*")`
1166    #[cfg(feature = "messaging")]
1167    pub async fn subscribe_all_motors(
1168        &self,
1169    ) -> Result<crate::aggregated::AggregatedReceiver<crate::actuator::MotorStatus>> {
1170        self.subscribe_aggregated("/actuator/motor/*").await
1171    }
1172
1173    /// Subscribe to a topic pattern with wildcard
1174    ///
1175    /// Generic helper for subscribing to multiple topics matching a pattern.
1176    ///
1177    /// # Example
1178    ///
1179    /// ```rust,no_run
1180    /// use mecha10::prelude::*;
1181    /// use mecha10::topics::sensor;
1182    ///
1183    /// # async fn example(ctx: &Context) -> Result<()> {
1184    /// // Subscribe to all instances of a specific topic type
1185    /// let mut cameras = ctx.subscribe_pattern(sensor::CAMERA_RGB, "*").await?;
1186    /// # Ok(())
1187    /// # }
1188    /// ```
1189    #[cfg(feature = "messaging")]
1190    pub async fn subscribe_pattern<T>(
1191        &self,
1192        topic: crate::topics::Topic<T>,
1193        pattern: &str,
1194    ) -> Result<crate::aggregated::AggregatedReceiver<T>>
1195    where
1196        T: Message + DeserializeOwned + Send + 'static,
1197    {
1198        let full_pattern = format!("{}/{}", topic.path(), pattern);
1199        self.subscribe_aggregated(&full_pattern).await
1200    }
1201
1202    /// Get or create a state manager for this node
1203    ///
1204    /// The state manager provides persistent storage for node state across restarts.
1205    /// By default, it uses filesystem storage in `./state/` directory.
1206    ///
1207    /// # Example
1208    ///
1209    /// ```rust,no_run
1210    /// use mecha10::prelude::*;
1211    /// use serde::{Deserialize, Serialize};
1212    ///
1213    /// #[derive(Debug, Clone, Serialize, Deserialize)]
1214    /// struct MyState {
1215    ///     counter: u64,
1216    ///     position: (f64, f64),
1217    /// }
1218    ///
1219    /// # async fn example(ctx: &Context) -> Result<()> {
1220    /// let state_mgr = ctx.state_manager().await?;
1221    ///
1222    /// // Load previous state
1223    /// let mut state = state_mgr
1224    ///     .load::<MyState>("my_node")
1225    ///     .await?
1226    ///     .unwrap_or(MyState {
1227    ///         counter: 0,
1228    ///         position: (0.0, 0.0),
1229    ///     });
1230    ///
1231    /// // Update state
1232    /// state.counter += 1;
1233    ///
1234    /// // Save state
1235    /// state_mgr.save("my_node", &state).await?;
1236    /// # Ok(())
1237    /// # }
1238    /// ```
1239    pub async fn state_manager(&self) -> Result<Arc<crate::state::ConcreteStateManager>> {
1240        let mut mgr = self.state_manager.write().await;
1241
1242        if let Some(existing) = mgr.as_ref() {
1243            return Ok(Arc::clone(existing));
1244        }
1245
1246        // Create default filesystem-based state manager
1247        let state_dir = std::env::var("MECHA10_STATE_DIR").unwrap_or_else(|_| "./state".to_string());
1248        let fs_mgr = FilesystemStateManager::new(&state_dir).await?;
1249        let arc_mgr = Arc::new(crate::state::ConcreteStateManager::Filesystem(fs_mgr));
1250
1251        *mgr = Some(Arc::clone(&arc_mgr));
1252
1253        Ok(arc_mgr)
1254    }
1255
1256    /// Set a custom state manager for this context
1257    ///
1258    /// This allows using alternative backends (Redis, PostgreSQL, etc.) or custom implementations.
1259    ///
1260    /// # Example
1261    ///
1262    /// ```rust,no_run
1263    /// use mecha10::prelude::*;
1264    /// use std::sync::Arc;
1265    ///
1266    /// # async fn example(ctx: &Context) -> Result<()> {
1267    /// // Use in-memory state manager for testing
1268    /// let mem_mgr = Arc::new(MemoryStateManager::new());
1269    /// ctx.set_state_manager(mem_mgr).await;
1270    /// # Ok(())
1271    /// # }
1272    /// ```
1273    pub async fn set_state_manager(&self, manager: Arc<crate::state::ConcreteStateManager>) {
1274        let mut mgr = self.state_manager.write().await;
1275        *mgr = Some(manager);
1276    }
1277
1278    // ========================================
1279    // Topic Discovery Helpers
1280    // ========================================
1281
1282    /// List all sensor topics currently active in the system
1283    ///
1284    /// Returns all topics under `/sensor/*` including cameras, LiDAR, IMU, GPS, etc.
1285    ///
1286    /// # Example
1287    ///
1288    /// ```rust,no_run
1289    /// use mecha10::prelude::*;
1290    ///
1291    /// # async fn example(ctx: &Context) -> Result<()> {
1292    /// let sensors = ctx.list_sensor_topics().await?;
1293    /// println!("Available sensors:");
1294    /// for topic in sensors {
1295    ///     println!("  - {}", topic);
1296    /// }
1297    /// # Ok(())
1298    /// # }
1299    /// ```
1300    #[cfg(feature = "messaging")]
1301    pub async fn list_sensor_topics(&self) -> Result<Vec<String>> {
1302        self.discover_topics("/sensor/*").await
1303    }
1304
1305    /// List all actuator topics currently active in the system
1306    ///
1307    /// Returns all topics under `/actuator/*` including motors, servos, grippers, etc.
1308    ///
1309    /// # Example
1310    ///
1311    /// ```rust,no_run
1312    /// use mecha10::prelude::*;
1313    ///
1314    /// # async fn example(ctx: &Context) -> Result<()> {
1315    /// let actuators = ctx.list_actuator_topics().await?;
1316    /// println!("Available actuators:");
1317    /// for topic in actuators {
1318    ///     println!("  - {}", topic);
1319    /// }
1320    /// # Ok(())
1321    /// # }
1322    /// ```
1323    #[cfg(feature = "messaging")]
1324    pub async fn list_actuator_topics(&self) -> Result<Vec<String>> {
1325        self.discover_topics("/actuator/*").await
1326    }
1327
1328    /// List all topics in the system
1329    ///
1330    /// Returns all active topics regardless of category.
1331    ///
1332    /// # Example
1333    ///
1334    /// ```rust,no_run
1335    /// use mecha10::prelude::*;
1336    ///
1337    /// # async fn example(ctx: &Context) -> Result<()> {
1338    /// let all_topics = ctx.list_all_topics().await?;
1339    /// println!("All active topics ({} total):", all_topics.len());
1340    /// for topic in all_topics {
1341    ///     println!("  - {}", topic);
1342    /// }
1343    /// # Ok(())
1344    /// # }
1345    /// ```
1346    #[cfg(feature = "messaging")]
1347    pub async fn list_all_topics(&self) -> Result<Vec<String>> {
1348        self.discover_topics("*").await
1349    }
1350
1351    /// List all camera topics
1352    ///
1353    /// Returns all topics under `/sensor/camera/*`.
1354    ///
1355    /// # Example
1356    ///
1357    /// ```rust,no_run
1358    /// use mecha10::prelude::*;
1359    ///
1360    /// # async fn example(ctx: &Context) -> Result<()> {
1361    /// let cameras = ctx.list_camera_topics().await?;
1362    /// println!("Available cameras:");
1363    /// for topic in cameras {
1364    ///     println!("  - {}", topic);
1365    /// }
1366    /// # Ok(())
1367    /// # }
1368    /// ```
1369    #[cfg(feature = "messaging")]
1370    pub async fn list_camera_topics(&self) -> Result<Vec<String>> {
1371        self.discover_topics("/sensor/camera/*").await
1372    }
1373
1374    /// List all LiDAR topics
1375    ///
1376    /// Returns all topics under `/sensor/lidar/*`.
1377    #[cfg(feature = "messaging")]
1378    pub async fn list_lidar_topics(&self) -> Result<Vec<String>> {
1379        self.discover_topics("/sensor/lidar/*").await
1380    }
1381
1382    /// List all IMU topics
1383    ///
1384    /// Returns all topics under `/sensor/imu/*`.
1385    #[cfg(feature = "messaging")]
1386    pub async fn list_imu_topics(&self) -> Result<Vec<String>> {
1387        self.discover_topics("/sensor/imu/*").await
1388    }
1389
1390    /// List all motor topics
1391    ///
1392    /// Returns all topics under `/actuator/motor/*`.
1393    #[cfg(feature = "messaging")]
1394    pub async fn list_motor_topics(&self) -> Result<Vec<String>> {
1395        self.discover_topics("/actuator/motor/*").await
1396    }
1397
1398    /// Print a formatted summary of all active topics
1399    ///
1400    /// Useful for debugging and introspection. Groups topics by category
1401    /// and displays them in a human-readable format.
1402    ///
1403    /// # Example
1404    ///
1405    /// ```rust,no_run
1406    /// use mecha10::prelude::*;
1407    ///
1408    /// # async fn example(ctx: &Context) -> Result<()> {
1409    /// ctx.print_topic_summary().await?;
1410    /// // Output:
1411    /// // === Active Topics ===
1412    /// // Sensors (3):
1413    /// //   - /sensor/camera/front
1414    /// //   - /sensor/lidar/main
1415    /// //   - /sensor/imu/base
1416    /// // Actuators (2):
1417    /// //   - /actuator/motor/left
1418    /// //   - /actuator/motor/right
1419    /// // =====================
1420    /// # Ok(())
1421    /// # }
1422    /// ```
1423    #[cfg(feature = "messaging")]
1424    pub async fn print_topic_summary(&self) -> Result<()> {
1425        let sensors = self.list_sensor_topics().await.unwrap_or_default();
1426        let actuators = self.list_actuator_topics().await.unwrap_or_default();
1427        let all_topics = self.list_all_topics().await?;
1428
1429        let other_topics: Vec<String> = all_topics
1430            .into_iter()
1431            .filter(|t| !t.starts_with("/sensor/") && !t.starts_with("/actuator/"))
1432            .collect();
1433
1434        println!("=== Active Topics ===");
1435
1436        if !sensors.is_empty() {
1437            println!("Sensors ({}):", sensors.len());
1438            for topic in sensors {
1439                println!("  - {}", topic);
1440            }
1441        }
1442
1443        if !actuators.is_empty() {
1444            println!("Actuators ({}):", actuators.len());
1445            for topic in actuators {
1446                println!("  - {}", topic);
1447            }
1448        }
1449
1450        if !other_topics.is_empty() {
1451            println!("Other ({}):", other_topics.len());
1452            for topic in other_topics {
1453                println!("  - {}", topic);
1454            }
1455        }
1456
1457        println!("=====================");
1458
1459        Ok(())
1460    }
1461}