mecha10_core/context/
mod.rs

1//! Node execution context
2//!
3//! Provides a high-level API for nodes to interact with the framework,
4//! including type-safe pub/sub, configuration access, and lifecycle management.
5//!
6//! This module has been refactored into smaller sub-modules for better organization:
7//! - `infra_config`: Infrastructure configuration types
8//! - `receiver`: Message receiver wrapper
9
10mod builder;
11mod infra_config;
12mod receiver;
13
14pub use builder::ContextBuilder;
15pub use infra_config::{BridgeConfig, InfrastructureConfig, MinioConfig, MlflowConfig, PostgresConfig, RedisConfig};
16pub use receiver::Receiver;
17
18use crate::error::{Mecha10Error, Result};
19use crate::messages::Message;
20use crate::state::FilesystemStateManager;
21use crate::topics::Topic;
22use anyhow::Context as _;
23use serde::{de::DeserializeOwned, Serialize};
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27// Import the complete Context struct and all implementations from the backup file
28// This maintains backward compatibility while improving code organization
29
30pub struct Context {
31    /// Node identifier (includes instance if set: "node_name" or "node_name/instance")
32    node_id: String,
33
34    /// Optional instance name
35    instance: Option<String>,
36
37    /// Local message bus for on-robot communication (wrapped in Arc for sharing across tasks)
38    #[cfg(feature = "messaging")]
39    message_bus: Arc<RwLock<mecha10_messaging::MessageBus>>,
40
41    /// Control plane message bus for robot-to-cloud communication (optional)
42    #[cfg(feature = "messaging")]
43    control_plane_bus: Arc<RwLock<Option<mecha10_messaging::MessageBus>>>,
44
45    /// Redis bridge for forwarding messages between local and control plane
46    #[cfg(feature = "messaging")]
47    redis_bridge: Arc<RwLock<Option<crate::redis_bridge::RedisBridge>>>,
48
49    /// Shutdown signal
50    shutdown: Arc<RwLock<bool>>,
51
52    /// State manager for persistent node state (lazy initialized)
53    state_manager: Arc<RwLock<Option<Arc<crate::state::ConcreteStateManager>>>>,
54}
55impl Clone for Context {
56    fn clone(&self) -> Self {
57        Self {
58            node_id: self.node_id.clone(),
59            instance: self.instance.clone(),
60            #[cfg(feature = "messaging")]
61            message_bus: Arc::clone(&self.message_bus),
62            #[cfg(feature = "messaging")]
63            control_plane_bus: Arc::clone(&self.control_plane_bus),
64            #[cfg(feature = "messaging")]
65            redis_bridge: Arc::clone(&self.redis_bridge),
66            shutdown: Arc::clone(&self.shutdown),
67            state_manager: Arc::clone(&self.state_manager),
68        }
69    }
70}
71impl Context {
72    /// Create a new context
73    ///
74    /// # Arguments
75    ///
76    /// * `node_id` - Unique identifier for this node
77    ///
78    /// # Example
79    ///
80    /// ```rust
81    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
82    /// let ctx = Context::new("my-node").await?;
83    /// # Ok(())
84    /// # }
85    /// ```
86    pub async fn new(node_id: &str) -> Result<Self> {
87        #[cfg(feature = "messaging")]
88        let (message_bus, control_plane_bus, redis_bridge) = {
89            // Load infrastructure config
90            let config = Self::load_infrastructure_config()?;
91
92            // Connect to local Redis
93            let redis_url = Self::get_redis_url_from_config(&config)?;
94            let local_bus = mecha10_messaging::MessageBus::connect(&redis_url, node_id)
95                .await
96                .with_context(|| {
97                    format!(
98                        "Failed to connect local message bus for node '{}' to Redis at '{}'",
99                        node_id, redis_url
100                    )
101                })
102                .map_err(|e| Mecha10Error::MessagingError {
103                    message: format!("{:#}", e),
104                    suggestion: "Ensure local Redis is running at the configured URL".to_string(),
105                })?;
106
107            // Connect to control plane Redis if configured
108            let cp_bus = if let Some(cp_config) = &config.control_plane_redis {
109                let cp_url = &cp_config.url;
110                match mecha10_messaging::MessageBus::connect(cp_url, node_id).await {
111                    Ok(bus) => {
112                        tracing::info!(
113                            "Connected to control plane Redis at '{}' for node '{}'",
114                            cp_url,
115                            node_id
116                        );
117                        Some(bus)
118                    }
119                    Err(e) => {
120                        tracing::warn!(
121                            "Failed to connect to control plane Redis at '{}': {}. Continuing with local Redis only.",
122                            cp_url,
123                            e
124                        );
125                        None
126                    }
127                }
128            } else {
129                None
130            };
131
132            // Initialize Redis bridge if both connections are available
133            let bridge = if let Some(_cp_bus_ref) = &cp_bus {
134                // Get bridge configuration (use default if not specified)
135                let bridge_config = config.bridge.unwrap_or_default();
136
137                if bridge_config.enabled {
138                    // Create bridge with separate clones of the buses
139                    let bridge_local_bus = mecha10_messaging::MessageBus::connect(&redis_url, node_id)
140                        .await
141                        .map_err(|e| Mecha10Error::MessagingError {
142                            message: format!("Failed to create bridge connection to local Redis: {}", e),
143                            suggestion: "Check local Redis connection".to_string(),
144                        })?;
145
146                    let cp_url = config
147                        .control_plane_redis
148                        .as_ref()
149                        .map(|c| c.url.clone())
150                        .unwrap_or_default();
151                    let bridge_cp_bus =
152                        mecha10_messaging::MessageBus::connect(&cp_url, node_id)
153                            .await
154                            .map_err(|e| Mecha10Error::MessagingError {
155                                message: format!("Failed to create bridge connection to control plane Redis: {}", e),
156                                suggestion: "Check control plane Redis connection".to_string(),
157                            })?;
158
159                    // Convert from infra_config::BridgeConfig to redis_bridge::BridgeConfig
160                    let redis_bridge_config = crate::redis_bridge::BridgeConfig {
161                        enabled: bridge_config.enabled,
162                        local_topics: bridge_config.local_topics,
163                        bridged_topics: bridge_config.bridged_topics,
164                        robot_id: bridge_config.robot_id,
165                    };
166
167                    let bridge =
168                        crate::redis_bridge::RedisBridge::new(redis_bridge_config, bridge_local_bus, bridge_cp_bus);
169
170                    // Start the bridge
171                    if let Err(e) = bridge.start().await {
172                        tracing::warn!("Failed to start Redis bridge: {}. Continuing without bridge.", e);
173                        None
174                    } else {
175                        tracing::info!("Redis bridge started successfully");
176                        Some(bridge)
177                    }
178                } else {
179                    tracing::debug!("Redis bridge is disabled in configuration");
180                    None
181                }
182            } else {
183                None
184            };
185
186            (local_bus, cp_bus, bridge)
187        };
188
189        Ok(Self {
190            node_id: node_id.to_string(),
191            instance: None,
192            #[cfg(feature = "messaging")]
193            message_bus: Arc::new(RwLock::new(message_bus)),
194            #[cfg(feature = "messaging")]
195            control_plane_bus: Arc::new(RwLock::new(control_plane_bus)),
196            #[cfg(feature = "messaging")]
197            redis_bridge: Arc::new(RwLock::new(redis_bridge)),
198            shutdown: Arc::new(RwLock::new(false)),
199            state_manager: Arc::new(RwLock::new(None)),
200        })
201    }
202
203    /// Set an instance name for this context (builder pattern)
204    ///
205    /// This allows the same node implementation to run multiple instances
206    /// with different names. The node_id becomes "{node_name}/{instance}".
207    ///
208    /// # Example
209    ///
210    /// ```rust
211    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
212    /// // Single instance (no instance name)
213    /// let ctx = Context::new("camera_fake").await?;
214    /// // node_id: "camera_fake"
215    ///
216    /// // Multi-instance (with instance names)
217    /// let left_ctx = Context::new("camera_fake").await?.with_instance("left");
218    /// // node_id: "camera_fake/left"
219    ///
220    /// let right_ctx = Context::new("camera_fake").await?.with_instance("right");
221    /// // node_id: "camera_fake/right"
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub fn with_instance(mut self, instance: &str) -> Self {
226        self.instance = Some(instance.to_string());
227        self.node_id = format!("{}/{}", self.node_id, instance);
228        self
229    }
230
231    /// Get the instance name if set
232    ///
233    /// Returns `None` for single-instance nodes, or `Some(instance)` for
234    /// multi-instance nodes.
235    pub fn instance(&self) -> Option<&str> {
236        self.instance.as_deref()
237    }
238
239    /// Load infrastructure configuration based on environment
240    ///
241    /// Priority order:
242    /// 1. mecha10.json (project configuration) - redis.url field
243    /// 2. Environment variables (MECHA10_REDIS_URL or REDIS_URL)
244    /// 3. config/infrastructure.<environment>.yaml
245    ///
246    /// Set via MECHA10_ENVIRONMENT environment variable.
247    fn load_infrastructure_config() -> Result<InfrastructureConfig> {
248        use crate::config::load_config;
249        use std::env;
250        use std::path::PathBuf;
251
252        // 1. Try loading from mecha10.json FIRST (project configuration)
253        if let Ok(redis_url) = Self::try_load_redis_from_mecha10_json() {
254            return Ok(InfrastructureConfig {
255                redis: crate::context::infra_config::RedisConfig {
256                    url: redis_url,
257                    max_connections: 10,
258                    connection_timeout_ms: 5000,
259                },
260                control_plane_redis: None,
261                bridge: None,
262                postgres: None,
263                mlflow: None,
264                minio: None,
265            });
266        }
267
268        // 2. Check for environment variable overrides
269        // If REDIS_URL or MECHA10_REDIS_URL is set, use that and create minimal config
270        if let Ok(redis_url) = env::var("MECHA10_REDIS_URL").or_else(|_| env::var("REDIS_URL")) {
271            return Ok(InfrastructureConfig {
272                redis: crate::context::infra_config::RedisConfig {
273                    url: redis_url,
274                    max_connections: 10,
275                    connection_timeout_ms: 5000,
276                },
277                control_plane_redis: None,
278                bridge: None,
279                postgres: None,
280                mlflow: None,
281                minio: None,
282            });
283        }
284
285        // 3. Load from infrastructure.yaml
286        let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
287
288        let config_filename = format!("infrastructure.{}.yaml", environment);
289
290        // Try multiple paths in order:
291        // 1. Current directory (for running from workspace root)
292        // 2. Workspace root (for running tests from target/debug/deps)
293        // 3. Two levels up (for running tests from packages/core/target/debug/deps)
294        let paths_to_try = vec![
295            PathBuf::from(format!("config/{}", config_filename)),
296            PathBuf::from(format!("../../config/{}", config_filename)),
297            PathBuf::from(format!("../../../../config/{}", config_filename)),
298        ];
299
300        for path in &paths_to_try {
301            if path.exists() {
302                return load_config(path)
303                    .with_context(|| {
304                        format!(
305                            "Failed to load infrastructure config for environment '{}' from '{}'",
306                            environment,
307                            path.display()
308                        )
309                    })
310                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
311            }
312        }
313
314        // If no config file found, return error with helpful message
315        Err(Mecha10Error::Configuration(format!(
316            "Failed to load infrastructure config for environment '{}'. \
317             Tried paths: {}. \
318             Make sure the file exists, set MECHA10_REDIS_URL environment variable, \
319             or configure redis.url in mecha10.json.",
320            environment,
321            paths_to_try
322                .iter()
323                .map(|p| p.display().to_string())
324                .collect::<Vec<_>>()
325                .join(", ")
326        )))
327    }
328
329    /// Try to load Redis URL from mecha10.json
330    ///
331    /// Looks for mecha10.json in current directory and reads redis.url field.
332    fn try_load_redis_from_mecha10_json() -> Result<String> {
333        use std::path::PathBuf;
334
335        // Try multiple paths to find mecha10.json
336        let paths_to_try = vec![
337            PathBuf::from("mecha10.json"),
338            PathBuf::from("../mecha10.json"),
339            PathBuf::from("../../mecha10.json"),
340        ];
341
342        for path in &paths_to_try {
343            if !path.exists() {
344                continue;
345            }
346
347            // Read and parse mecha10.json
348            let content = std::fs::read_to_string(path)
349                .map_err(|e| Mecha10Error::Configuration(format!("Failed to read {}: {}", path.display(), e)))?;
350
351            let json: serde_json::Value = serde_json::from_str(&content)
352                .map_err(|e| Mecha10Error::Configuration(format!("Failed to parse {}: {}", path.display(), e)))?;
353
354            // Extract redis.url
355            if let Some(redis_url) = json.get("redis").and_then(|r| r.get("url")).and_then(|u| u.as_str()) {
356                return Ok(redis_url.to_string());
357            }
358        }
359
360        Err(Mecha10Error::Configuration(
361            "No mecha10.json found with redis.url configuration".to_string(),
362        ))
363    }
364
365    /// Get Redis URL from infrastructure configuration (helper for internal use)
366    fn get_redis_url_from_config(config: &InfrastructureConfig) -> Result<String> {
367        // First try environment variables (for backward compatibility and testing)
368        if let Ok(url) = std::env::var("MECHA10_REDIS_URL") {
369            return Ok(url);
370        }
371        if let Ok(url) = std::env::var("REDIS_URL") {
372            return Ok(url);
373        }
374
375        // Use config
376        Ok(config.redis.url.clone())
377    }
378
379    /// Get Redis URL from infrastructure configuration
380    pub fn get_redis_url() -> Result<String> {
381        let config = Self::load_infrastructure_config()?;
382        Self::get_redis_url_from_config(&config)
383    }
384
385    /// Get the node ID
386    pub fn node_id(&self) -> &str {
387        &self.node_id
388    }
389
390    /// Load node configuration with environment-aware file-level fallback
391    ///
392    /// This method implements the V2 configuration system with:
393    /// - Environment-specific configs (dev/staging/production)
394    /// - File-level fallback (no key-by-key merging)
395    /// - Automatic path resolution
396    ///
397    /// **Fallback Priority:**
398    /// 1. `configs/{env}/nodes/{node-name}/config.json` (environment-specific)
399    /// 2. `configs/common/nodes/{node-name}/config.json` (common fallback)
400    /// 3. `T::default()` (code default)
401    ///
402    /// **Environment Detection:**
403    /// - Uses `MECHA10_ENVIRONMENT` env var (default: "dev")
404    /// - Supported values: "dev", "staging", "production", or custom
405    ///
406    /// # Type Parameters
407    ///
408    /// * `T` - Configuration type that implements `DeserializeOwned` + `Default`
409    ///
410    /// # Arguments
411    ///
412    /// * `node_name` - Name of the node (e.g., "simulation-bridge", "camera-streamer")
413    ///
414    /// # Returns
415    ///
416    /// The loaded configuration of type `T`, using the first available source
417    /// in the fallback chain.
418    ///
419    /// # Example
420    ///
421    /// ```rust
422    /// use mecha10::prelude::*;
423    /// use serde::{Deserialize, Serialize};
424    ///
425    /// #[derive(Debug, Clone, Deserialize, Serialize, Default)]
426    /// struct SimBridgeConfig {
427    ///     camera_width: u32,
428    ///     camera_height: u32,
429    ///     camera_fps: u32,
430    /// }
431    ///
432    /// # async fn example(ctx: &Context) -> Result<()> {
433    /// // Loads from configs/{env}/nodes/simulation-bridge/config.json
434    /// // Falls back to configs/common/nodes/simulation-bridge/config.json
435    /// // Falls back to SimBridgeConfig::default()
436    /// let config: SimBridgeConfig = ctx.load_node_config("simulation-bridge").await?;
437    /// # Ok(())
438    /// # }
439    /// ```
440    pub async fn load_node_config<T>(&self, node_name: &str) -> Result<T>
441    where
442        T: DeserializeOwned + Default,
443    {
444        use crate::config::load_config;
445        use std::env;
446        use std::path::PathBuf;
447
448        // Get environment from env var (default: "dev")
449        let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "dev".to_string());
450
451        // Try multiple base paths (for running from different locations)
452        let base_paths = vec![
453            PathBuf::from("."),           // Current directory
454            PathBuf::from("../.."),       // From target/debug/deps
455            PathBuf::from("../../../.."), // From packages/*/target/debug/deps
456        ];
457
458        // Build fallback chain for each base path
459        for base_path in &base_paths {
460            // 1. Try environment-specific config
461            let env_config_path = base_path
462                .join("configs")
463                .join(&environment)
464                .join("nodes")
465                .join(node_name)
466                .join("config.json");
467
468            if env_config_path.exists() {
469                tracing::debug!(
470                    "Loading node config for '{}' from environment-specific path: {}",
471                    node_name,
472                    env_config_path.display()
473                );
474                return load_config(&env_config_path)
475                    .with_context(|| {
476                        format!(
477                            "Failed to load node config for '{}' from '{}'",
478                            node_name,
479                            env_config_path.display()
480                        )
481                    })
482                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
483            }
484
485            // 2. Try common config
486            let common_config_path = base_path
487                .join("configs")
488                .join("common")
489                .join("nodes")
490                .join(node_name)
491                .join("config.json");
492
493            if common_config_path.exists() {
494                tracing::debug!(
495                    "Loading node config for '{}' from common path: {}",
496                    node_name,
497                    common_config_path.display()
498                );
499                return load_config(&common_config_path)
500                    .with_context(|| {
501                        format!(
502                            "Failed to load node config for '{}' from '{}'",
503                            node_name,
504                            common_config_path.display()
505                        )
506                    })
507                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
508            }
509        }
510
511        // 3. Fall back to default
512        tracing::debug!(
513            "No config file found for node '{}', using Default implementation",
514            node_name
515        );
516        Ok(T::default())
517    }
518
519    /// Check if control plane Redis connection is available
520    #[cfg(feature = "messaging")]
521    pub async fn has_control_plane(&self) -> bool {
522        self.control_plane_bus.read().await.is_some()
523    }
524
525    /// Get control plane Redis URL from infrastructure configuration
526    pub fn get_control_plane_redis_url() -> Result<Option<String>> {
527        let config = Self::load_infrastructure_config()?;
528        Ok(config.control_plane_redis.map(|c| c.url))
529    }
530
531    /// Subscribe to a topic with type-safe message handling
532    ///
533    /// # Arguments
534    ///
535    /// * `topic` - Type-safe topic to subscribe to
536    ///
537    /// # Returns
538    ///
539    /// A receiver that will yield messages of type `T`
540    ///
541    /// # Example
542    ///
543    /// ```rust
544    /// use mecha10::prelude::*;
545    /// use mecha10::topics::sensor;
546    ///
547    /// # async fn example(ctx: &Context) -> Result<()> {
548    /// // Type is inferred as Receiver<Image>
549    /// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
550    ///
551    /// while let Some(image) = images.recv().await {
552    ///     println!("Image: {}x{}", image.width, image.height);
553    /// }
554    /// # Ok(())
555    /// # }
556    /// ```
557    #[cfg(feature = "messaging")]
558    pub async fn subscribe<T>(&self, topic: Topic<T>) -> Result<Receiver<T>>
559    where
560        T: Message + DeserializeOwned + Send + 'static,
561    {
562        let mut bus = self.message_bus.write().await;
563
564        // Create a unique consumer group for this node
565        let consumer_group = format!("{}-{}", self.node_id, topic.path().replace('/', "-"));
566
567        let subscriber = bus
568            .subscribe::<T>(topic.path(), &consumer_group)
569            .await
570            .with_context(|| {
571                format!(
572                    "Failed to subscribe to topic '{}' with consumer group '{}' from node '{}'",
573                    topic.path(),
574                    consumer_group,
575                    self.node_id
576                )
577            })
578            .map_err(|e| Mecha10Error::MessagingError {
579                message: format!("{:#}", e),
580                suggestion: format!("Check that Redis is running and topic '{}' is valid", topic.path()),
581            })?;
582
583        // Convert to our Receiver type
584        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
585
586        // Spawn task to forward messages
587        tokio::spawn(async move {
588            let mut sub = subscriber;
589            while let Some(msg) = sub.recv().await {
590                // Auto-acknowledge message before moving payload
591                if let Err(e) = msg.ack().await {
592                    tracing::warn!(
593                        topic = %msg.topic,
594                        message_id = %msg.id,
595                        error = %e,
596                        "Failed to acknowledge message (will be redelivered on reconnect)"
597                    );
598                }
599                // Extract payload and forward
600                if tx.send(msg.payload).is_err() {
601                    break; // Receiver dropped
602                }
603            }
604        });
605
606        Ok(Receiver::from(rx))
607    }
608
609    /// Publish a message to a specific topic
610    ///
611    /// # Arguments
612    ///
613    /// * `topic` - Type-safe topic to publish to
614    /// * `message` - Message to publish (type checked against topic)
615    ///
616    /// # Example
617    ///
618    /// ```rust
619    /// use mecha10::prelude::*;
620    /// use mecha10::topics::sensor;
621    /// use mecha10::messages::Image;
622    ///
623    /// # async fn example(ctx: &Context) -> Result<()> {
624    /// let image = Image {
625    ///     timestamp: now_micros(),
626    ///     width: 640,
627    ///     height: 480,
628    ///     encoding: "rgb8".to_string(),
629    ///     data: vec![],
630    /// };
631    ///
632    /// // Type checked: image must be of type Image
633    /// ctx.publish_to(sensor::CAMERA_RGB, &image).await?;
634    /// # Ok(())
635    /// # }
636    /// ```
637    #[cfg(feature = "messaging")]
638    pub async fn publish_to<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
639    where
640        T: Message + Serialize,
641    {
642        let mut bus = self.message_bus.write().await;
643
644        bus.publish(topic.path(), message)
645            .await
646            .with_context(|| {
647                format!(
648                    "Failed to publish to topic '{}' from node '{}'",
649                    topic.path(),
650                    self.node_id
651                )
652            })
653            .map_err(|e| Mecha10Error::MessagingError {
654                message: format!("{:#}", e),
655                suggestion: "Check that Redis is running and accepting connections".to_string(),
656            })?;
657
658        Ok(())
659    }
660
661    /// Publish a message to a dynamic topic path (for runtime-generated topics like camera streams)
662    ///
663    /// This is useful when the topic path is determined at runtime and cannot be a `&'static str`.
664    ///
665    /// # Arguments
666    /// * `topic_path` - The topic path as a String (e.g., "robot/sensors/camera/front/compressed")
667    /// * `message` - The message to publish
668    pub async fn publish_to_path<T>(&self, topic_path: &str, message: &T) -> Result<()>
669    where
670        T: Message + Serialize,
671    {
672        let mut bus = self.message_bus.write().await;
673
674        bus.publish(topic_path, message)
675            .await
676            .with_context(|| {
677                format!(
678                    "Failed to publish to topic '{}' from node '{}'",
679                    topic_path, self.node_id
680                )
681            })
682            .map_err(|e| Mecha10Error::MessagingError {
683                message: format!("{:#}", e),
684                suggestion: "Check that Redis is running and accepting connections".to_string(),
685            })?;
686
687        Ok(())
688    }
689
690    /// Publish a message to an instance-scoped topic
691    ///
692    /// This automatically appends the context's instance name to the topic path.
693    /// If no instance is set, this behaves like `publish_to()`.
694    ///
695    /// # Arguments
696    ///
697    /// * `topic` - Type-safe topic to publish to
698    /// * `message` - Message to publish (type checked against topic)
699    ///
700    /// # Example
701    ///
702    /// ```rust
703    /// use mecha10::prelude::*;
704    /// use mecha10::topics::sensor;
705    /// use mecha10::messages::Image;
706    ///
707    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
708    /// let ctx = Context::new("camera_fake").await?.with_instance("left");
709    ///
710    /// let image = Image {
711    ///     timestamp: now_micros(),
712    ///     width: 640,
713    ///     height: 480,
714    ///     encoding: "rgb8".to_string(),
715    ///     data: vec![],
716    /// };
717    ///
718    /// // Publishes to "/sensor/camera/rgb/left"
719    /// ctx.publish_to_scoped(sensor::CAMERA_RGB, &image).await?;
720    /// # Ok(())
721    /// # }
722    /// ```
723    #[cfg(feature = "messaging")]
724    pub async fn publish_to_scoped<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
725    where
726        T: Message + Serialize,
727    {
728        let scoped_path = if let Some(instance) = &self.instance {
729            format!("{}/{}", topic.path(), instance)
730        } else {
731            topic.path().to_string()
732        };
733
734        self.publish_raw(&scoped_path, message).await
735    }
736
737    /// Publish a message to a topic with an explicit instance suffix
738    ///
739    /// This allows publishing to a specific instance's topic regardless of
740    /// the context's own instance name.
741    ///
742    /// # Arguments
743    ///
744    /// * `topic` - Type-safe topic to publish to
745    /// * `instance` - Instance name to append to topic path
746    /// * `message` - Message to publish (type checked against topic)
747    ///
748    /// # Example
749    ///
750    /// ```rust
751    /// use mecha10::prelude::*;
752    /// use mecha10::topics::actuator;
753    ///
754    /// # async fn example(ctx: &Context) -> Result<()> {
755    /// let cmd = MotorCommand { velocity: 1.0, torque: 0.0 };
756    ///
757    /// // Send to specific motor instances
758    /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_left", &cmd).await?;
759    /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_right", &cmd).await?;
760    /// # Ok(())
761    /// # }
762    /// ```
763    #[cfg(feature = "messaging")]
764    pub async fn publish_to_instance<T>(&self, topic: Topic<T>, instance: &str, message: &T) -> Result<()>
765    where
766        T: Message + Serialize,
767    {
768        let instance_path = format!("{}/{}", topic.path(), instance);
769        self.publish_raw(&instance_path, message).await
770    }
771
772    /// Subscribe to an instance-scoped topic
773    ///
774    /// This automatically appends the context's instance name to the topic path.
775    /// If no instance is set, this behaves like `subscribe()`.
776    ///
777    /// # Example
778    ///
779    /// ```rust
780    /// use mecha10::prelude::*;
781    /// use mecha10::topics::sensor;
782    ///
783    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
784    /// let ctx = Context::new("vision_node").await?;
785    ///
786    /// // Subscribe to left camera (topic: "/sensor/camera/rgb/left")
787    /// let mut left_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "left").await?;
788    ///
789    /// // Subscribe to right camera (topic: "/sensor/camera/rgb/right")
790    /// let mut right_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "right").await?;
791    /// # Ok(())
792    /// # }
793    /// ```
794    #[cfg(feature = "messaging")]
795    pub async fn subscribe_scoped<T>(&self, topic: Topic<T>, instance: &str) -> Result<Receiver<T>>
796    where
797        T: Message + DeserializeOwned + Send + 'static,
798    {
799        let scoped_path = format!("{}/{}", topic.path(), instance);
800        self.subscribe_raw(&scoped_path).await
801    }
802
803    /// Subscribe to a topic using a raw string path (for RPC and advanced use cases)
804    ///
805    /// # Arguments
806    ///
807    /// * `topic_path` - Raw topic path string
808    ///
809    /// # Returns
810    ///
811    /// A receiver that will yield messages of type `T`
812    ///
813    /// # Note
814    ///
815    /// This is a low-level method. Prefer using `subscribe()` with type-safe topics
816    /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
817    #[cfg(feature = "messaging")]
818    pub async fn subscribe_raw<T>(&self, topic_path: &str) -> Result<Receiver<T>>
819    where
820        T: Message + DeserializeOwned + Send + 'static,
821    {
822        let mut bus = self.message_bus.write().await;
823
824        // Create a unique consumer group for this node
825        let consumer_group = format!("{}-{}", self.node_id, topic_path.replace('/', "-"));
826
827        let subscriber = bus
828            .subscribe::<T>(topic_path, &consumer_group)
829            .await
830            .with_context(|| {
831                format!(
832                    "Failed to subscribe to raw topic '{}' with consumer group '{}' from node '{}'",
833                    topic_path, consumer_group, self.node_id
834                )
835            })
836            .map_err(|e| Mecha10Error::MessagingError {
837                message: format!("{:#}", e),
838                suggestion: format!("Check that Redis is running and topic '{}' is valid", topic_path),
839            })?;
840
841        // Convert to our Receiver type
842        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
843
844        // Spawn task to forward messages
845        tokio::spawn(async move {
846            let mut sub = subscriber;
847            while let Some(msg) = sub.recv().await {
848                // Auto-acknowledge message before moving payload
849                if let Err(e) = msg.ack().await {
850                    tracing::warn!(
851                        topic = %msg.topic,
852                        message_id = %msg.id,
853                        error = %e,
854                        "Failed to acknowledge message (will be redelivered on reconnect)"
855                    );
856                }
857                // Extract payload and forward
858                if tx.send(msg.payload).is_err() {
859                    break; // Receiver dropped
860                }
861            }
862        });
863
864        Ok(Receiver::from(rx))
865    }
866
867    /// Discover topics matching a glob pattern
868    ///
869    /// Queries Redis for all topics matching the given pattern without subscribing.
870    /// Useful for inspecting available topics before subscribing.
871    ///
872    /// # Pattern Syntax
873    ///
874    /// - `*` matches any sequence of characters within a path segment
875    /// - `**` matches any sequence of path segments
876    /// - `{a,b}` matches either a or b (future enhancement)
877    ///
878    /// # Arguments
879    ///
880    /// * `pattern` - Glob pattern to match topics against
881    ///
882    /// # Returns
883    ///
884    /// A vector of topic paths that match the pattern
885    ///
886    /// # Example
887    ///
888    /// ```rust,no_run
889    /// use mecha10::prelude::*;
890    ///
891    /// # async fn example(ctx: &Context) -> Result<()> {
892    /// // Discover all camera topics
893    /// let topics = ctx.discover_topics("/sensor/camera/*").await?;
894    /// println!("Found {} camera topics: {:?}", topics.len(), topics);
895    /// # Ok(())
896    /// # }
897    /// ```
898    #[cfg(feature = "messaging")]
899    pub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>> {
900        use crate::aggregated::pattern::convert_glob_to_redis;
901
902        let bus = self.message_bus.read().await;
903        let redis_pattern = convert_glob_to_redis(pattern);
904
905        bus.discover_topics(&redis_pattern)
906            .await
907            .with_context(|| {
908                format!(
909                    "Failed to discover topics matching pattern '{}' (redis pattern: '{}') from node '{}'",
910                    pattern, redis_pattern, self.node_id
911                )
912            })
913            .map_err(|e| Mecha10Error::MessagingError {
914                message: format!("{:#}", e),
915                suggestion: "Check that Redis is running and the pattern is valid".to_string(),
916            })
917    }
918
919    /// Subscribe to multiple topics matching a glob pattern
920    ///
921    /// Returns an aggregated receiver that multiplexes all matching topics into a single stream.
922    /// Each message is tagged with its source topic path.
923    ///
924    /// # Pattern Syntax
925    ///
926    /// - `*` matches any sequence of characters within a path segment
927    /// - `**` matches any sequence of path segments
928    /// - `{a,b}` matches either a or b (future enhancement)
929    ///
930    /// # Arguments
931    ///
932    /// * `pattern` - Glob pattern to match topics against
933    ///
934    /// # Returns
935    ///
936    /// An `AggregatedReceiver<T>` that yields `(topic_path, message)` tuples
937    ///
938    /// # Examples
939    ///
940    /// ```rust,no_run
941    /// use mecha10::prelude::*;
942    ///
943    /// # async fn example(ctx: &Context) -> Result<()> {
944    /// // Subscribe to all cameras with one call
945    /// let mut cameras = ctx.subscribe_aggregated::<ImageMessage>("/sensor/camera/*").await?;
946    ///
947    /// while let Some((topic, image)) = cameras.recv().await {
948    ///     let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
949    ///     println!("Camera {}: {}x{}", camera_name, image.width, image.height);
950    /// }
951    /// # Ok(())
952    /// # }
953    /// ```
954    #[cfg(feature = "messaging")]
955    pub async fn subscribe_aggregated<T>(&self, pattern: &str) -> Result<crate::aggregated::AggregatedReceiver<T>>
956    where
957        T: Message + DeserializeOwned + Send + 'static,
958    {
959        // 1. Discover all topics matching pattern
960        let matching_topics = self.discover_topics(pattern).await?;
961
962        if matching_topics.is_empty() {
963            return Err(Mecha10Error::MessagingError {
964                message: format!("No topics found matching pattern: {}", pattern),
965                suggestion: "Check that the pattern is correct and topics exist".to_string(),
966            });
967        }
968
969        // 2. Subscribe to each topic
970        let mut receivers = Vec::new();
971        for topic_path in matching_topics {
972            let rx = self.subscribe_raw::<T>(&topic_path).await?;
973            // Extract receiver and wrap in ReceiverType enum
974            let receiver_type = match rx.inner {
975                crate::context::receiver::ReceiverInner::Unbounded(unbounded) => {
976                    crate::aggregated::ReceiverType::Unbounded(unbounded)
977                }
978                crate::context::receiver::ReceiverInner::Bounded(bounded) => {
979                    crate::aggregated::ReceiverType::Bounded(bounded)
980                }
981            };
982            receivers.push((topic_path.clone(), receiver_type));
983        }
984
985        // 3. Return aggregated receiver with support for both channel types
986        Ok(crate::aggregated::AggregatedReceiver::new_mixed(
987            pattern.to_string(),
988            receivers,
989        ))
990    }
991
992    /// Publish a message to a topic using a raw string path (for RPC and advanced use cases)
993    ///
994    /// # Arguments
995    ///
996    /// * `topic_path` - Raw topic path string
997    /// * `message` - Message to publish
998    ///
999    /// # Note
1000    ///
1001    /// This is a low-level method. Prefer using `publish_to()` with type-safe topics
1002    /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
1003    #[cfg(feature = "messaging")]
1004    pub async fn publish_raw<T>(&self, topic_path: &str, message: &T) -> Result<()>
1005    where
1006        T: Message + Serialize,
1007    {
1008        let mut bus = self.message_bus.write().await;
1009
1010        bus.publish(topic_path, message)
1011            .await
1012            .with_context(|| {
1013                format!(
1014                    "Failed to publish to raw topic '{}' from node '{}'",
1015                    topic_path, self.node_id
1016                )
1017            })
1018            .map_err(|e| Mecha10Error::MessagingError {
1019                message: format!("{:#}", e),
1020                suggestion: "Check that Redis is running and accepting connections".to_string(),
1021            })?;
1022
1023        Ok(())
1024    }
1025
1026    /// Check if shutdown has been requested
1027    pub async fn is_shutdown(&self) -> bool {
1028        *self.shutdown.read().await
1029    }
1030
1031    /// Request shutdown
1032    pub async fn shutdown(&self) {
1033        *self.shutdown.write().await = true;
1034
1035        // Stop Redis bridge if running
1036        #[cfg(feature = "messaging")]
1037        {
1038            let bridge_opt = self.redis_bridge.read().await;
1039            if let Some(bridge) = bridge_opt.as_ref() {
1040                if let Err(e) = bridge.stop().await {
1041                    tracing::warn!("Failed to stop Redis bridge during shutdown: {}", e);
1042                }
1043            }
1044        }
1045    }
1046
1047    /// Wait for shutdown signal
1048    pub async fn wait_for_shutdown(&self) {
1049        loop {
1050            if self.is_shutdown().await {
1051                break;
1052            }
1053            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1054        }
1055    }
1056
1057    /// Enter a logging span with this node's context
1058    ///
1059    /// This returns a guard that, when entered, adds the node_id to all log messages.
1060    /// The span is automatically exited when the guard is dropped.
1061    ///
1062    /// # Example
1063    ///
1064    /// ```rust
1065    /// use mecha10::prelude::*;
1066    ///
1067    /// # async fn example(ctx: &Context) -> Result<()> {
1068    /// let _guard = ctx.logging_span().entered();
1069    /// info!("This message includes node_id automatically");
1070    /// debug!("So does this one");
1071    /// # Ok(())
1072    /// # }
1073    /// ```
1074    pub fn logging_span(&self) -> tracing::Span {
1075        tracing::info_span!("node", node_id = %self.node_id)
1076    }
1077
1078    /// Subscribe to all camera topics
1079    ///
1080    /// Convenience method for `subscribe_aggregated::<Image>("/sensor/camera/*")`
1081    ///
1082    /// # Example
1083    ///
1084    /// ```rust,no_run
1085    /// use mecha10::prelude::*;
1086    ///
1087    /// # async fn example(ctx: &Context) -> Result<()> {
1088    /// let mut cameras = ctx.subscribe_all_cameras().await?;
1089    /// while let Some((topic, image)) = cameras.recv().await {
1090    ///     let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
1091    ///     println!("Camera {}: {}x{}", camera_name, image.width, image.height);
1092    /// }
1093    /// # Ok(())
1094    /// # }
1095    /// ```
1096    #[cfg(feature = "messaging")]
1097    pub async fn subscribe_all_cameras(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::Image>> {
1098        self.subscribe_aggregated("/sensor/camera/*").await
1099    }
1100
1101    /// Subscribe to all sensor topics
1102    ///
1103    /// Subscribes to all sensor data including cameras, LiDAR, IMU, odometry, etc.
1104    ///
1105    /// # Example
1106    ///
1107    /// ```rust,no_run
1108    /// use mecha10::prelude::*;
1109    ///
1110    /// # async fn example(ctx: &Context) -> Result<()> {
1111    /// // Subscribe to all sensors (returns untyped messages)
1112    /// let topics = ctx.discover_topics("/sensor/**").await?;
1113    /// println!("Found {} sensor topics", topics.len());
1114    /// # Ok(())
1115    /// # }
1116    /// ```
1117    #[cfg(feature = "messaging")]
1118    pub async fn discover_all_sensors(&self) -> Result<Vec<String>> {
1119        self.discover_topics("/sensor/**").await
1120    }
1121
1122    /// Subscribe to all LiDAR topics
1123    ///
1124    /// Convenience method for `subscribe_aggregated::<LaserScan>("/sensor/lidar/*")`
1125    #[cfg(feature = "messaging")]
1126    pub async fn subscribe_all_lidars(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::LaserScan>> {
1127        self.subscribe_aggregated("/sensor/lidar/*").await
1128    }
1129
1130    /// Subscribe to all IMU topics
1131    ///
1132    /// Convenience method for `subscribe_aggregated::<IMU>("/sensor/imu/*")`
1133    #[cfg(feature = "messaging")]
1134    pub async fn subscribe_all_imus(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::IMU>> {
1135        self.subscribe_aggregated("/sensor/imu/*").await
1136    }
1137
1138    /// Subscribe to all motor topics
1139    ///
1140    /// Convenience method for `subscribe_aggregated::<MotorStatus>("/actuator/motor/*")`
1141    #[cfg(feature = "messaging")]
1142    pub async fn subscribe_all_motors(
1143        &self,
1144    ) -> Result<crate::aggregated::AggregatedReceiver<crate::actuator::MotorStatus>> {
1145        self.subscribe_aggregated("/actuator/motor/*").await
1146    }
1147
1148    /// Subscribe to a topic pattern with wildcard
1149    ///
1150    /// Generic helper for subscribing to multiple topics matching a pattern.
1151    ///
1152    /// # Example
1153    ///
1154    /// ```rust,no_run
1155    /// use mecha10::prelude::*;
1156    /// use mecha10::topics::sensor;
1157    ///
1158    /// # async fn example(ctx: &Context) -> Result<()> {
1159    /// // Subscribe to all instances of a specific topic type
1160    /// let mut cameras = ctx.subscribe_pattern(sensor::CAMERA_RGB, "*").await?;
1161    /// # Ok(())
1162    /// # }
1163    /// ```
1164    #[cfg(feature = "messaging")]
1165    pub async fn subscribe_pattern<T>(
1166        &self,
1167        topic: crate::topics::Topic<T>,
1168        pattern: &str,
1169    ) -> Result<crate::aggregated::AggregatedReceiver<T>>
1170    where
1171        T: Message + DeserializeOwned + Send + 'static,
1172    {
1173        let full_pattern = format!("{}/{}", topic.path(), pattern);
1174        self.subscribe_aggregated(&full_pattern).await
1175    }
1176
1177    /// Get or create a state manager for this node
1178    ///
1179    /// The state manager provides persistent storage for node state across restarts.
1180    /// By default, it uses filesystem storage in `./state/` directory.
1181    ///
1182    /// # Example
1183    ///
1184    /// ```rust,no_run
1185    /// use mecha10::prelude::*;
1186    /// use serde::{Deserialize, Serialize};
1187    ///
1188    /// #[derive(Debug, Clone, Serialize, Deserialize)]
1189    /// struct MyState {
1190    ///     counter: u64,
1191    ///     position: (f64, f64),
1192    /// }
1193    ///
1194    /// # async fn example(ctx: &Context) -> Result<()> {
1195    /// let state_mgr = ctx.state_manager().await?;
1196    ///
1197    /// // Load previous state
1198    /// let mut state = state_mgr
1199    ///     .load::<MyState>("my_node")
1200    ///     .await?
1201    ///     .unwrap_or(MyState {
1202    ///         counter: 0,
1203    ///         position: (0.0, 0.0),
1204    ///     });
1205    ///
1206    /// // Update state
1207    /// state.counter += 1;
1208    ///
1209    /// // Save state
1210    /// state_mgr.save("my_node", &state).await?;
1211    /// # Ok(())
1212    /// # }
1213    /// ```
1214    pub async fn state_manager(&self) -> Result<Arc<crate::state::ConcreteStateManager>> {
1215        let mut mgr = self.state_manager.write().await;
1216
1217        if let Some(existing) = mgr.as_ref() {
1218            return Ok(Arc::clone(existing));
1219        }
1220
1221        // Create default filesystem-based state manager
1222        let state_dir = std::env::var("MECHA10_STATE_DIR").unwrap_or_else(|_| "./state".to_string());
1223        let fs_mgr = FilesystemStateManager::new(&state_dir).await?;
1224        let arc_mgr = Arc::new(crate::state::ConcreteStateManager::Filesystem(fs_mgr));
1225
1226        *mgr = Some(Arc::clone(&arc_mgr));
1227
1228        Ok(arc_mgr)
1229    }
1230
1231    /// Set a custom state manager for this context
1232    ///
1233    /// This allows using alternative backends (Redis, PostgreSQL, etc.) or custom implementations.
1234    ///
1235    /// # Example
1236    ///
1237    /// ```rust,no_run
1238    /// use mecha10::prelude::*;
1239    /// use std::sync::Arc;
1240    ///
1241    /// # async fn example(ctx: &Context) -> Result<()> {
1242    /// // Use in-memory state manager for testing
1243    /// let mem_mgr = Arc::new(MemoryStateManager::new());
1244    /// ctx.set_state_manager(mem_mgr).await;
1245    /// # Ok(())
1246    /// # }
1247    /// ```
1248    pub async fn set_state_manager(&self, manager: Arc<crate::state::ConcreteStateManager>) {
1249        let mut mgr = self.state_manager.write().await;
1250        *mgr = Some(manager);
1251    }
1252
1253    // ========================================
1254    // Topic Discovery Helpers
1255    // ========================================
1256
1257    /// List all sensor topics currently active in the system
1258    ///
1259    /// Returns all topics under `/sensor/*` including cameras, LiDAR, IMU, GPS, etc.
1260    ///
1261    /// # Example
1262    ///
1263    /// ```rust,no_run
1264    /// use mecha10::prelude::*;
1265    ///
1266    /// # async fn example(ctx: &Context) -> Result<()> {
1267    /// let sensors = ctx.list_sensor_topics().await?;
1268    /// println!("Available sensors:");
1269    /// for topic in sensors {
1270    ///     println!("  - {}", topic);
1271    /// }
1272    /// # Ok(())
1273    /// # }
1274    /// ```
1275    #[cfg(feature = "messaging")]
1276    pub async fn list_sensor_topics(&self) -> Result<Vec<String>> {
1277        self.discover_topics("/sensor/*").await
1278    }
1279
1280    /// List all actuator topics currently active in the system
1281    ///
1282    /// Returns all topics under `/actuator/*` including motors, servos, grippers, etc.
1283    ///
1284    /// # Example
1285    ///
1286    /// ```rust,no_run
1287    /// use mecha10::prelude::*;
1288    ///
1289    /// # async fn example(ctx: &Context) -> Result<()> {
1290    /// let actuators = ctx.list_actuator_topics().await?;
1291    /// println!("Available actuators:");
1292    /// for topic in actuators {
1293    ///     println!("  - {}", topic);
1294    /// }
1295    /// # Ok(())
1296    /// # }
1297    /// ```
1298    #[cfg(feature = "messaging")]
1299    pub async fn list_actuator_topics(&self) -> Result<Vec<String>> {
1300        self.discover_topics("/actuator/*").await
1301    }
1302
1303    /// List all topics in the system
1304    ///
1305    /// Returns all active topics regardless of category.
1306    ///
1307    /// # Example
1308    ///
1309    /// ```rust,no_run
1310    /// use mecha10::prelude::*;
1311    ///
1312    /// # async fn example(ctx: &Context) -> Result<()> {
1313    /// let all_topics = ctx.list_all_topics().await?;
1314    /// println!("All active topics ({} total):", all_topics.len());
1315    /// for topic in all_topics {
1316    ///     println!("  - {}", topic);
1317    /// }
1318    /// # Ok(())
1319    /// # }
1320    /// ```
1321    #[cfg(feature = "messaging")]
1322    pub async fn list_all_topics(&self) -> Result<Vec<String>> {
1323        self.discover_topics("*").await
1324    }
1325
1326    /// List all camera topics
1327    ///
1328    /// Returns all topics under `/sensor/camera/*`.
1329    ///
1330    /// # Example
1331    ///
1332    /// ```rust,no_run
1333    /// use mecha10::prelude::*;
1334    ///
1335    /// # async fn example(ctx: &Context) -> Result<()> {
1336    /// let cameras = ctx.list_camera_topics().await?;
1337    /// println!("Available cameras:");
1338    /// for topic in cameras {
1339    ///     println!("  - {}", topic);
1340    /// }
1341    /// # Ok(())
1342    /// # }
1343    /// ```
1344    #[cfg(feature = "messaging")]
1345    pub async fn list_camera_topics(&self) -> Result<Vec<String>> {
1346        self.discover_topics("/sensor/camera/*").await
1347    }
1348
1349    /// List all LiDAR topics
1350    ///
1351    /// Returns all topics under `/sensor/lidar/*`.
1352    #[cfg(feature = "messaging")]
1353    pub async fn list_lidar_topics(&self) -> Result<Vec<String>> {
1354        self.discover_topics("/sensor/lidar/*").await
1355    }
1356
1357    /// List all IMU topics
1358    ///
1359    /// Returns all topics under `/sensor/imu/*`.
1360    #[cfg(feature = "messaging")]
1361    pub async fn list_imu_topics(&self) -> Result<Vec<String>> {
1362        self.discover_topics("/sensor/imu/*").await
1363    }
1364
1365    /// List all motor topics
1366    ///
1367    /// Returns all topics under `/actuator/motor/*`.
1368    #[cfg(feature = "messaging")]
1369    pub async fn list_motor_topics(&self) -> Result<Vec<String>> {
1370        self.discover_topics("/actuator/motor/*").await
1371    }
1372
1373    /// Print a formatted summary of all active topics
1374    ///
1375    /// Useful for debugging and introspection. Groups topics by category
1376    /// and displays them in a human-readable format.
1377    ///
1378    /// # Example
1379    ///
1380    /// ```rust,no_run
1381    /// use mecha10::prelude::*;
1382    ///
1383    /// # async fn example(ctx: &Context) -> Result<()> {
1384    /// ctx.print_topic_summary().await?;
1385    /// // Output:
1386    /// // === Active Topics ===
1387    /// // Sensors (3):
1388    /// //   - /sensor/camera/front
1389    /// //   - /sensor/lidar/main
1390    /// //   - /sensor/imu/base
1391    /// // Actuators (2):
1392    /// //   - /actuator/motor/left
1393    /// //   - /actuator/motor/right
1394    /// // =====================
1395    /// # Ok(())
1396    /// # }
1397    /// ```
1398    #[cfg(feature = "messaging")]
1399    pub async fn print_topic_summary(&self) -> Result<()> {
1400        let sensors = self.list_sensor_topics().await.unwrap_or_default();
1401        let actuators = self.list_actuator_topics().await.unwrap_or_default();
1402        let all_topics = self.list_all_topics().await?;
1403
1404        let other_topics: Vec<String> = all_topics
1405            .into_iter()
1406            .filter(|t| !t.starts_with("/sensor/") && !t.starts_with("/actuator/"))
1407            .collect();
1408
1409        println!("=== Active Topics ===");
1410
1411        if !sensors.is_empty() {
1412            println!("Sensors ({}):", sensors.len());
1413            for topic in sensors {
1414                println!("  - {}", topic);
1415            }
1416        }
1417
1418        if !actuators.is_empty() {
1419            println!("Actuators ({}):", actuators.len());
1420            for topic in actuators {
1421                println!("  - {}", topic);
1422            }
1423        }
1424
1425        if !other_topics.is_empty() {
1426            println!("Other ({}):", other_topics.len());
1427            for topic in other_topics {
1428                println!("  - {}", topic);
1429            }
1430        }
1431
1432        println!("=====================");
1433
1434        Ok(())
1435    }
1436}