mecha10_core/context/
mod.rs

1//! Node execution context
2//!
3//! Provides a high-level API for nodes to interact with the framework,
4//! including type-safe pub/sub, configuration access, and lifecycle management.
5//!
6//! This module has been refactored into smaller sub-modules for better organization:
7//! - `infra_config`: Infrastructure configuration types
8//! - `receiver`: Message receiver wrapper
9
10mod builder;
11mod infra_config;
12mod receiver;
13
14pub use builder::ContextBuilder;
15pub use infra_config::{BridgeConfig, InfrastructureConfig, MinioConfig, MlflowConfig, PostgresConfig, RedisConfig};
16pub use receiver::Receiver;
17
18use crate::error::{Mecha10Error, Result};
19use crate::messages::Message;
20use crate::state::FilesystemStateManager;
21use crate::topics::Topic;
22use anyhow::Context as _;
23use serde::{de::DeserializeOwned, Serialize};
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27// Import the complete Context struct and all implementations from the backup file
28// This maintains backward compatibility while improving code organization
29
30pub struct Context {
31    /// Node identifier (includes instance if set: "node_name" or "node_name/instance")
32    node_id: String,
33
34    /// Optional instance name
35    instance: Option<String>,
36
37    /// Local message bus for on-robot communication (wrapped in Arc for sharing across tasks)
38    #[cfg(feature = "messaging")]
39    message_bus: Arc<RwLock<mecha10_messaging::MessageBus>>,
40
41    /// Control plane message bus for robot-to-cloud communication (optional)
42    #[cfg(feature = "messaging")]
43    control_plane_bus: Arc<RwLock<Option<mecha10_messaging::MessageBus>>>,
44
45    /// Redis bridge for forwarding messages between local and control plane
46    #[cfg(feature = "messaging")]
47    redis_bridge: Arc<RwLock<Option<crate::redis_bridge::RedisBridge>>>,
48
49    /// Shutdown signal
50    shutdown: Arc<RwLock<bool>>,
51
52    /// State manager for persistent node state (lazy initialized)
53    state_manager: Arc<RwLock<Option<Arc<crate::state::ConcreteStateManager>>>>,
54}
55impl Clone for Context {
56    fn clone(&self) -> Self {
57        Self {
58            node_id: self.node_id.clone(),
59            instance: self.instance.clone(),
60            #[cfg(feature = "messaging")]
61            message_bus: Arc::clone(&self.message_bus),
62            #[cfg(feature = "messaging")]
63            control_plane_bus: Arc::clone(&self.control_plane_bus),
64            #[cfg(feature = "messaging")]
65            redis_bridge: Arc::clone(&self.redis_bridge),
66            shutdown: Arc::clone(&self.shutdown),
67            state_manager: Arc::clone(&self.state_manager),
68        }
69    }
70}
71impl Context {
72    /// Create a new context
73    ///
74    /// # Arguments
75    ///
76    /// * `node_id` - Unique identifier for this node (fallback if NODE_NAME env var not set)
77    ///
78    /// # Environment Variables
79    ///
80    /// * `NODE_NAME` - If set, overrides the `node_id` parameter. This allows node-runner
81    ///   to set full identifiers like `@mecha10/speaker` for consistent naming across
82    ///   the CLI, dashboard, and health reporting.
83    ///
84    /// # Example
85    ///
86    /// ```rust
87    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
88    /// let ctx = Context::new("my-node").await?;
89    /// # Ok(())
90    /// # }
91    /// ```
92    pub async fn new(node_id: &str) -> Result<Self> {
93        // Use NODE_NAME from environment if set (for consistent naming with node-runner)
94        // This allows bundled nodes to report health with full identifiers like @mecha10/speaker
95        let node_id = std::env::var("NODE_NAME").unwrap_or_else(|_| node_id.to_string());
96        #[cfg(feature = "messaging")]
97        let (message_bus, control_plane_bus, redis_bridge) = {
98            // Load infrastructure config
99            let config = Self::load_infrastructure_config()?;
100
101            // Connect to local Redis
102            let redis_url = Self::get_redis_url_from_config(&config)?;
103            let local_bus = mecha10_messaging::MessageBus::connect(&redis_url, &node_id)
104                .await
105                .with_context(|| {
106                    format!(
107                        "Failed to connect local message bus for node '{}' to Redis at '{}'",
108                        node_id, redis_url
109                    )
110                })
111                .map_err(|e| Mecha10Error::MessagingError {
112                    message: format!("{:#}", e),
113                    suggestion: "Ensure local Redis is running at the configured URL".to_string(),
114                })?;
115
116            // Connect to control plane Redis if configured
117            let cp_bus = if let Some(cp_config) = &config.control_plane_redis {
118                let cp_url = &cp_config.url;
119                match mecha10_messaging::MessageBus::connect(cp_url, &node_id).await {
120                    Ok(bus) => {
121                        tracing::info!(
122                            "Connected to control plane Redis at '{}' for node '{}'",
123                            cp_url,
124                            node_id
125                        );
126                        Some(bus)
127                    }
128                    Err(e) => {
129                        tracing::warn!(
130                            "Failed to connect to control plane Redis at '{}': {}. Continuing with local Redis only.",
131                            cp_url,
132                            e
133                        );
134                        None
135                    }
136                }
137            } else {
138                None
139            };
140
141            // Initialize Redis bridge if both connections are available
142            let bridge = if let Some(_cp_bus_ref) = &cp_bus {
143                // Get bridge configuration (use default if not specified)
144                let bridge_config = config.bridge.unwrap_or_default();
145
146                if bridge_config.enabled {
147                    // Create bridge with separate clones of the buses
148                    let bridge_local_bus = mecha10_messaging::MessageBus::connect(&redis_url, &node_id)
149                        .await
150                        .map_err(|e| Mecha10Error::MessagingError {
151                            message: format!("Failed to create bridge connection to local Redis: {}", e),
152                            suggestion: "Check local Redis connection".to_string(),
153                        })?;
154
155                    let cp_url = config
156                        .control_plane_redis
157                        .as_ref()
158                        .map(|c| c.url.clone())
159                        .unwrap_or_default();
160                    let bridge_cp_bus = mecha10_messaging::MessageBus::connect(&cp_url, &node_id)
161                        .await
162                        .map_err(|e| Mecha10Error::MessagingError {
163                            message: format!("Failed to create bridge connection to control plane Redis: {}", e),
164                            suggestion: "Check control plane Redis connection".to_string(),
165                        })?;
166
167                    // Convert from infra_config::BridgeConfig to redis_bridge::BridgeConfig
168                    let redis_bridge_config = crate::redis_bridge::BridgeConfig {
169                        enabled: bridge_config.enabled,
170                        local_topics: bridge_config.local_topics,
171                        bridged_topics: bridge_config.bridged_topics,
172                        robot_id: bridge_config.robot_id,
173                    };
174
175                    let bridge =
176                        crate::redis_bridge::RedisBridge::new(redis_bridge_config, bridge_local_bus, bridge_cp_bus);
177
178                    // Start the bridge
179                    if let Err(e) = bridge.start().await {
180                        tracing::warn!("Failed to start Redis bridge: {}. Continuing without bridge.", e);
181                        None
182                    } else {
183                        tracing::info!("Redis bridge started successfully");
184                        Some(bridge)
185                    }
186                } else {
187                    tracing::debug!("Redis bridge is disabled in configuration");
188                    None
189                }
190            } else {
191                None
192            };
193
194            (local_bus, cp_bus, bridge)
195        };
196
197        Ok(Self {
198            node_id,
199            instance: None,
200            #[cfg(feature = "messaging")]
201            message_bus: Arc::new(RwLock::new(message_bus)),
202            #[cfg(feature = "messaging")]
203            control_plane_bus: Arc::new(RwLock::new(control_plane_bus)),
204            #[cfg(feature = "messaging")]
205            redis_bridge: Arc::new(RwLock::new(redis_bridge)),
206            shutdown: Arc::new(RwLock::new(false)),
207            state_manager: Arc::new(RwLock::new(None)),
208        })
209    }
210
211    /// Set an instance name for this context (builder pattern)
212    ///
213    /// This allows the same node implementation to run multiple instances
214    /// with different names. The node_id becomes "{node_name}/{instance}".
215    ///
216    /// # Example
217    ///
218    /// ```rust
219    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
220    /// // Single instance (no instance name)
221    /// let ctx = Context::new("camera_fake").await?;
222    /// // node_id: "camera_fake"
223    ///
224    /// // Multi-instance (with instance names)
225    /// let left_ctx = Context::new("camera_fake").await?.with_instance("left");
226    /// // node_id: "camera_fake/left"
227    ///
228    /// let right_ctx = Context::new("camera_fake").await?.with_instance("right");
229    /// // node_id: "camera_fake/right"
230    /// # Ok(())
231    /// # }
232    /// ```
233    pub fn with_instance(mut self, instance: &str) -> Self {
234        self.instance = Some(instance.to_string());
235        self.node_id = format!("{}/{}", self.node_id, instance);
236        self
237    }
238
239    /// Get the instance name if set
240    ///
241    /// Returns `None` for single-instance nodes, or `Some(instance)` for
242    /// multi-instance nodes.
243    pub fn instance(&self) -> Option<&str> {
244        self.instance.as_deref()
245    }
246
247    /// Load infrastructure configuration based on environment
248    ///
249    /// Priority order:
250    /// 1. mecha10.json (project configuration) - redis.url field
251    /// 2. Environment variables (MECHA10_REDIS_URL or REDIS_URL)
252    /// 3. config/infrastructure.<environment>.yaml
253    ///
254    /// Set via MECHA10_ENVIRONMENT environment variable.
255    fn load_infrastructure_config() -> Result<InfrastructureConfig> {
256        use crate::config::load_config;
257        use std::env;
258        use std::path::PathBuf;
259
260        // 1. Try loading from mecha10.json FIRST (project configuration)
261        if let Ok(redis_url) = Self::try_load_redis_from_mecha10_json() {
262            return Ok(InfrastructureConfig {
263                redis: crate::context::infra_config::RedisConfig {
264                    url: redis_url,
265                    max_connections: 10,
266                    connection_timeout_ms: 5000,
267                },
268                control_plane_redis: None,
269                bridge: None,
270                postgres: None,
271                mlflow: None,
272                minio: None,
273            });
274        }
275
276        // 2. Check for environment variable overrides
277        // If REDIS_URL or MECHA10_REDIS_URL is set, use that and create minimal config
278        if let Ok(redis_url) = env::var("MECHA10_REDIS_URL").or_else(|_| env::var("REDIS_URL")) {
279            return Ok(InfrastructureConfig {
280                redis: crate::context::infra_config::RedisConfig {
281                    url: redis_url,
282                    max_connections: 10,
283                    connection_timeout_ms: 5000,
284                },
285                control_plane_redis: None,
286                bridge: None,
287                postgres: None,
288                mlflow: None,
289                minio: None,
290            });
291        }
292
293        // 3. Load from infrastructure.yaml
294        let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
295
296        let config_filename = format!("infrastructure.{}.yaml", environment);
297
298        // Try multiple paths in order:
299        // 1. Current directory (for running from workspace root)
300        // 2. Workspace root (for running tests from target/debug/deps)
301        // 3. Two levels up (for running tests from packages/core/target/debug/deps)
302        let paths_to_try = vec![
303            PathBuf::from(format!("config/{}", config_filename)),
304            PathBuf::from(format!("../../config/{}", config_filename)),
305            PathBuf::from(format!("../../../../config/{}", config_filename)),
306        ];
307
308        for path in &paths_to_try {
309            if path.exists() {
310                return load_config(path)
311                    .with_context(|| {
312                        format!(
313                            "Failed to load infrastructure config for environment '{}' from '{}'",
314                            environment,
315                            path.display()
316                        )
317                    })
318                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
319            }
320        }
321
322        // If no config file found, return error with helpful message
323        Err(Mecha10Error::Configuration(format!(
324            "Failed to load infrastructure config for environment '{}'. \
325             Tried paths: {}. \
326             Make sure the file exists, set MECHA10_REDIS_URL environment variable, \
327             or configure redis.url in mecha10.json.",
328            environment,
329            paths_to_try
330                .iter()
331                .map(|p| p.display().to_string())
332                .collect::<Vec<_>>()
333                .join(", ")
334        )))
335    }
336
337    /// Try to load Redis URL from mecha10.json
338    ///
339    /// Looks for mecha10.json in current directory and reads redis.url field.
340    fn try_load_redis_from_mecha10_json() -> Result<String> {
341        use std::path::PathBuf;
342
343        // Try multiple paths to find mecha10.json
344        let paths_to_try = vec![
345            PathBuf::from("mecha10.json"),
346            PathBuf::from("../mecha10.json"),
347            PathBuf::from("../../mecha10.json"),
348        ];
349
350        for path in &paths_to_try {
351            if !path.exists() {
352                continue;
353            }
354
355            // Read and parse mecha10.json
356            let content = std::fs::read_to_string(path)
357                .map_err(|e| Mecha10Error::Configuration(format!("Failed to read {}: {}", path.display(), e)))?;
358
359            let json: serde_json::Value = serde_json::from_str(&content)
360                .map_err(|e| Mecha10Error::Configuration(format!("Failed to parse {}: {}", path.display(), e)))?;
361
362            // Extract redis.url
363            if let Some(redis_url) = json.get("redis").and_then(|r| r.get("url")).and_then(|u| u.as_str()) {
364                return Ok(redis_url.to_string());
365            }
366        }
367
368        Err(Mecha10Error::Configuration(
369            "No mecha10.json found with redis.url configuration".to_string(),
370        ))
371    }
372
373    /// Get Redis URL from infrastructure configuration (helper for internal use)
374    fn get_redis_url_from_config(config: &InfrastructureConfig) -> Result<String> {
375        // First try environment variables (for backward compatibility and testing)
376        if let Ok(url) = std::env::var("MECHA10_REDIS_URL") {
377            return Ok(url);
378        }
379        if let Ok(url) = std::env::var("REDIS_URL") {
380            return Ok(url);
381        }
382
383        // Use config
384        Ok(config.redis.url.clone())
385    }
386
387    /// Get Redis URL from infrastructure configuration
388    pub fn get_redis_url() -> Result<String> {
389        let config = Self::load_infrastructure_config()?;
390        Self::get_redis_url_from_config(&config)
391    }
392
393    /// Get the node ID
394    pub fn node_id(&self) -> &str {
395        &self.node_id
396    }
397
398    /// Load node configuration with environment-aware file-level fallback
399    ///
400    /// This method implements the V2 configuration system with:
401    /// - Environment-specific configs (dev/staging/production)
402    /// - File-level fallback (no key-by-key merging)
403    /// - Automatic path resolution
404    ///
405    /// **Fallback Priority:**
406    /// 1. `configs/{env}/nodes/{node-name}/config.json` (environment-specific)
407    /// 2. `configs/common/nodes/{node-name}/config.json` (common fallback)
408    /// 3. `T::default()` (code default)
409    ///
410    /// **Environment Detection:**
411    /// - Uses `MECHA10_ENVIRONMENT` env var (default: "dev")
412    /// - Supported values: "dev", "staging", "production", or custom
413    ///
414    /// # Type Parameters
415    ///
416    /// * `T` - Configuration type that implements `DeserializeOwned` + `Default`
417    ///
418    /// # Arguments
419    ///
420    /// * `node_name` - Name of the node (e.g., "simulation-bridge", "camera-streamer")
421    ///
422    /// # Returns
423    ///
424    /// The loaded configuration of type `T`, using the first available source
425    /// in the fallback chain.
426    ///
427    /// # Fallback Chain
428    ///
429    /// Configs are searched in order:
430    /// 1. `configs/{env}/nodes/{node_name}/config.json` - Environment-specific
431    /// 2. `configs/common/nodes/{node_name}/config.json` - Common config
432    /// 3. `configs/nodes/@mecha10/{node_name}/config.json` - Framework nodes
433    /// 4. `configs/nodes/{node_name}/config.json` - Project nodes (preferred)
434    /// 5. `configs/nodes/@local/{node_name}/config.json` - Legacy format
435    /// 6. `T::default()` - Default implementation
436    ///
437    /// # Example
438    ///
439    /// ```rust
440    /// use mecha10::prelude::*;
441    /// use serde::{Deserialize, Serialize};
442    ///
443    /// #[derive(Debug, Clone, Deserialize, Serialize, Default)]
444    /// struct SimBridgeConfig {
445    ///     camera_width: u32,
446    ///     camera_height: u32,
447    ///     camera_fps: u32,
448    /// }
449    ///
450    /// # async fn example(ctx: &Context) -> Result<()> {
451    /// let config: SimBridgeConfig = ctx.load_node_config("simulation-bridge").await?;
452    /// # Ok(())
453    /// # }
454    /// ```
455    pub async fn load_node_config<T>(&self, node_name: &str) -> Result<T>
456    where
457        T: DeserializeOwned + Default,
458    {
459        use crate::config::load_config;
460        use std::env;
461        use std::path::PathBuf;
462
463        // Get environment from env var (default: "dev")
464        let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "dev".to_string());
465
466        // Try multiple base paths (for running from different locations)
467        let base_paths = vec![
468            PathBuf::from("."),           // Current directory
469            PathBuf::from("../.."),       // From target/debug/deps
470            PathBuf::from("../../../.."), // From packages/*/target/debug/deps
471        ];
472
473        // Build fallback chain for each base path
474        for base_path in &base_paths {
475            // 1. Try environment-specific config
476            let env_config_path = base_path
477                .join("configs")
478                .join(&environment)
479                .join("nodes")
480                .join(node_name)
481                .join("config.json");
482
483            if env_config_path.exists() {
484                tracing::debug!(
485                    "Loading node config for '{}' from environment-specific path: {}",
486                    node_name,
487                    env_config_path.display()
488                );
489                return load_config(&env_config_path)
490                    .with_context(|| {
491                        format!(
492                            "Failed to load node config for '{}' from '{}'",
493                            node_name,
494                            env_config_path.display()
495                        )
496                    })
497                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
498            }
499
500            // 2. Try common config
501            let common_config_path = base_path
502                .join("configs")
503                .join("common")
504                .join("nodes")
505                .join(node_name)
506                .join("config.json");
507
508            if common_config_path.exists() {
509                tracing::debug!(
510                    "Loading node config for '{}' from common path: {}",
511                    node_name,
512                    common_config_path.display()
513                );
514                return load_config(&common_config_path)
515                    .with_context(|| {
516                        format!(
517                            "Failed to load node config for '{}' from '{}'",
518                            node_name,
519                            common_config_path.display()
520                        )
521                    })
522                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
523            }
524
525            // 3. Try new flat structure: configs/nodes/@mecha10/{node_name}/config.json
526            // This format supports environment-aware configs: { "dev": {...}, "production": {...} }
527            let flat_config_path = base_path
528                .join("configs")
529                .join("nodes")
530                .join("@mecha10")
531                .join(node_name)
532                .join("config.json");
533
534            if flat_config_path.exists() {
535                tracing::debug!(
536                    "Loading node config for '{}' from flat path: {}",
537                    node_name,
538                    flat_config_path.display()
539                );
540                return Self::load_environment_aware_config(&flat_config_path, &environment)
541                    .with_context(|| {
542                        format!(
543                            "Failed to load node config for '{}' from '{}'",
544                            node_name,
545                            flat_config_path.display()
546                        )
547                    })
548                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
549            }
550
551            // 4. Try plain project nodes: configs/nodes/{node_name}/config.json
552            // This is the preferred format for project-local nodes
553            let project_config_path = base_path
554                .join("configs")
555                .join("nodes")
556                .join(node_name)
557                .join("config.json");
558
559            if project_config_path.exists() {
560                tracing::debug!(
561                    "Loading node config for '{}' from project path: {}",
562                    node_name,
563                    project_config_path.display()
564                );
565                return Self::load_environment_aware_config(&project_config_path, &environment)
566                    .with_context(|| {
567                        format!(
568                            "Failed to load node config for '{}' from '{}'",
569                            node_name,
570                            project_config_path.display()
571                        )
572                    })
573                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
574            }
575
576            // 5. Try @local nodes (legacy): configs/nodes/@local/{node_name}/config.json
577            let local_config_path = base_path
578                .join("configs")
579                .join("nodes")
580                .join("@local")
581                .join(node_name)
582                .join("config.json");
583
584            if local_config_path.exists() {
585                tracing::debug!(
586                    "Loading node config for '{}' from legacy local path: {}",
587                    node_name,
588                    local_config_path.display()
589                );
590                return Self::load_environment_aware_config(&local_config_path, &environment)
591                    .with_context(|| {
592                        format!(
593                            "Failed to load node config for '{}' from '{}'",
594                            node_name,
595                            local_config_path.display()
596                        )
597                    })
598                    .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
599            }
600        }
601
602        // 3. Fall back to default
603        tracing::debug!(
604            "No config file found for node '{}', using Default implementation",
605            node_name
606        );
607        Ok(T::default())
608    }
609
610    /// Load a config file that may be in environment-aware format.
611    ///
612    /// This handles configs with the format:
613    /// ```json
614    /// {
615    ///   "dev": { ... config ... },
616    ///   "production": { ... config ... }
617    /// }
618    /// ```
619    ///
620    /// Falls back to direct deserialization for backwards compatibility with
621    /// configs that don't use the environment-aware format.
622    fn load_environment_aware_config<T>(path: &std::path::Path, environment: &str) -> Result<T>
623    where
624        T: DeserializeOwned + Default,
625    {
626        let content =
627            std::fs::read_to_string(path).with_context(|| format!("Failed to read config file: {}", path.display()))?;
628
629        // First, try to parse as a generic JSON value to check structure
630        let json_value: serde_json::Value =
631            serde_json::from_str(&content).with_context(|| format!("Failed to parse JSON from: {}", path.display()))?;
632
633        // Check if this is an environment-aware config (has "dev" or "production" keys)
634        if let Some(obj) = json_value.as_object() {
635            if obj.contains_key("dev") || obj.contains_key("production") {
636                // This is an environment-aware config, extract the right section
637                let env_key = match environment {
638                    "production" | "prod" => "production",
639                    _ => "dev",
640                };
641
642                if let Some(env_section) = obj.get(env_key) {
643                    tracing::debug!("Loading '{}' environment section from: {}", env_key, path.display());
644                    return serde_json::from_value(env_section.clone())
645                        .with_context(|| {
646                            format!("Failed to deserialize '{}' section from: {}", env_key, path.display())
647                        })
648                        .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
649                } else {
650                    // Requested environment not found, try the other one
651                    let fallback_key = if env_key == "dev" { "production" } else { "dev" };
652                    if let Some(fallback_section) = obj.get(fallback_key) {
653                        tracing::warn!(
654                            "Environment '{}' not found in config, falling back to '{}': {}",
655                            env_key,
656                            fallback_key,
657                            path.display()
658                        );
659                        return serde_json::from_value(fallback_section.clone())
660                            .with_context(|| {
661                                format!(
662                                    "Failed to deserialize '{}' section from: {}",
663                                    fallback_key,
664                                    path.display()
665                                )
666                            })
667                            .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
668                    }
669                }
670            }
671        }
672
673        // Not an environment-aware config, deserialize directly for backwards compatibility
674        tracing::debug!("Config is not environment-aware, loading directly: {}", path.display());
675        serde_json::from_value(json_value)
676            .with_context(|| format!("Failed to deserialize config from: {}", path.display()))
677            .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)))
678    }
679
680    /// Check if control plane Redis connection is available
681    #[cfg(feature = "messaging")]
682    pub async fn has_control_plane(&self) -> bool {
683        self.control_plane_bus.read().await.is_some()
684    }
685
686    /// Get control plane Redis URL from infrastructure configuration
687    pub fn get_control_plane_redis_url() -> Result<Option<String>> {
688        let config = Self::load_infrastructure_config()?;
689        Ok(config.control_plane_redis.map(|c| c.url))
690    }
691
692    /// Subscribe to a topic with type-safe message handling
693    ///
694    /// # Arguments
695    ///
696    /// * `topic` - Type-safe topic to subscribe to
697    ///
698    /// # Returns
699    ///
700    /// A receiver that will yield messages of type `T`
701    ///
702    /// # Example
703    ///
704    /// ```rust
705    /// use mecha10::prelude::*;
706    /// use mecha10::topics::sensor;
707    ///
708    /// # async fn example(ctx: &Context) -> Result<()> {
709    /// // Type is inferred as Receiver<Image>
710    /// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
711    ///
712    /// while let Some(image) = images.recv().await {
713    ///     println!("Image: {}x{}", image.width, image.height);
714    /// }
715    /// # Ok(())
716    /// # }
717    /// ```
718    #[cfg(feature = "messaging")]
719    pub async fn subscribe<T>(&self, topic: Topic<T>) -> Result<Receiver<T>>
720    where
721        T: Message + DeserializeOwned + Send + 'static,
722    {
723        let mut bus = self.message_bus.write().await;
724
725        // Create a unique consumer group for this node
726        let consumer_group = format!("{}-{}", self.node_id, topic.path().replace('/', "-"));
727
728        let subscriber = bus
729            .subscribe::<T>(topic.path(), &consumer_group)
730            .await
731            .with_context(|| {
732                format!(
733                    "Failed to subscribe to topic '{}' with consumer group '{}' from node '{}'",
734                    topic.path(),
735                    consumer_group,
736                    self.node_id
737                )
738            })
739            .map_err(|e| Mecha10Error::MessagingError {
740                message: format!("{:#}", e),
741                suggestion: format!("Check that Redis is running and topic '{}' is valid", topic.path()),
742            })?;
743
744        // Convert to our Receiver type
745        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
746
747        // Spawn task to forward messages
748        tokio::spawn(async move {
749            let mut sub = subscriber;
750            while let Some(msg) = sub.recv().await {
751                // Auto-acknowledge message before moving payload
752                if let Err(e) = msg.ack().await {
753                    tracing::warn!(
754                        topic = %msg.topic,
755                        message_id = %msg.id,
756                        error = %e,
757                        "Failed to acknowledge message (will be redelivered on reconnect)"
758                    );
759                }
760                // Extract payload and forward
761                if tx.send(msg.payload).is_err() {
762                    break; // Receiver dropped
763                }
764            }
765        });
766
767        Ok(Receiver::from(rx))
768    }
769
770    /// Publish a message to a specific topic
771    ///
772    /// # Arguments
773    ///
774    /// * `topic` - Type-safe topic to publish to
775    /// * `message` - Message to publish (type checked against topic)
776    ///
777    /// # Example
778    ///
779    /// ```rust
780    /// use mecha10::prelude::*;
781    /// use mecha10::topics::sensor;
782    /// use mecha10::messages::Image;
783    ///
784    /// # async fn example(ctx: &Context) -> Result<()> {
785    /// let image = Image {
786    ///     timestamp: now_micros(),
787    ///     width: 640,
788    ///     height: 480,
789    ///     encoding: "rgb8".to_string(),
790    ///     data: vec![],
791    /// };
792    ///
793    /// // Type checked: image must be of type Image
794    /// ctx.publish_to(sensor::CAMERA_RGB, &image).await?;
795    /// # Ok(())
796    /// # }
797    /// ```
798    #[cfg(feature = "messaging")]
799    pub async fn publish_to<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
800    where
801        T: Message + Serialize,
802    {
803        let mut bus = self.message_bus.write().await;
804
805        bus.publish(topic.path(), message)
806            .await
807            .with_context(|| {
808                format!(
809                    "Failed to publish to topic '{}' from node '{}'",
810                    topic.path(),
811                    self.node_id
812                )
813            })
814            .map_err(|e| Mecha10Error::MessagingError {
815                message: format!("{:#}", e),
816                suggestion: "Check that Redis is running and accepting connections".to_string(),
817            })?;
818
819        Ok(())
820    }
821
822    /// Publish a message to a dynamic topic path (for runtime-generated topics like camera streams)
823    ///
824    /// This is useful when the topic path is determined at runtime and cannot be a `&'static str`.
825    ///
826    /// # Arguments
827    /// * `topic_path` - The topic path as a String (e.g., "robot/sensors/camera/front/compressed")
828    /// * `message` - The message to publish
829    pub async fn publish_to_path<T>(&self, topic_path: &str, message: &T) -> Result<()>
830    where
831        T: Message + Serialize,
832    {
833        let mut bus = self.message_bus.write().await;
834
835        bus.publish(topic_path, message)
836            .await
837            .with_context(|| {
838                format!(
839                    "Failed to publish to topic '{}' from node '{}'",
840                    topic_path, self.node_id
841                )
842            })
843            .map_err(|e| Mecha10Error::MessagingError {
844                message: format!("{:#}", e),
845                suggestion: "Check that Redis is running and accepting connections".to_string(),
846            })?;
847
848        Ok(())
849    }
850
851    /// Publish a message to an instance-scoped topic
852    ///
853    /// This automatically appends the context's instance name to the topic path.
854    /// If no instance is set, this behaves like `publish_to()`.
855    ///
856    /// # Arguments
857    ///
858    /// * `topic` - Type-safe topic to publish to
859    /// * `message` - Message to publish (type checked against topic)
860    ///
861    /// # Example
862    ///
863    /// ```rust
864    /// use mecha10::prelude::*;
865    /// use mecha10::topics::sensor;
866    /// use mecha10::messages::Image;
867    ///
868    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
869    /// let ctx = Context::new("camera_fake").await?.with_instance("left");
870    ///
871    /// let image = Image {
872    ///     timestamp: now_micros(),
873    ///     width: 640,
874    ///     height: 480,
875    ///     encoding: "rgb8".to_string(),
876    ///     data: vec![],
877    /// };
878    ///
879    /// // Publishes to "/sensor/camera/rgb/left"
880    /// ctx.publish_to_scoped(sensor::CAMERA_RGB, &image).await?;
881    /// # Ok(())
882    /// # }
883    /// ```
884    #[cfg(feature = "messaging")]
885    pub async fn publish_to_scoped<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
886    where
887        T: Message + Serialize,
888    {
889        let scoped_path = if let Some(instance) = &self.instance {
890            format!("{}/{}", topic.path(), instance)
891        } else {
892            topic.path().to_string()
893        };
894
895        self.publish_raw(&scoped_path, message).await
896    }
897
898    /// Publish a message to a topic with an explicit instance suffix
899    ///
900    /// This allows publishing to a specific instance's topic regardless of
901    /// the context's own instance name.
902    ///
903    /// # Arguments
904    ///
905    /// * `topic` - Type-safe topic to publish to
906    /// * `instance` - Instance name to append to topic path
907    /// * `message` - Message to publish (type checked against topic)
908    ///
909    /// # Example
910    ///
911    /// ```rust
912    /// use mecha10::prelude::*;
913    /// use mecha10::topics::actuator;
914    ///
915    /// # async fn example(ctx: &Context) -> Result<()> {
916    /// let cmd = MotorCommand { velocity: 1.0, torque: 0.0 };
917    ///
918    /// // Send to specific motor instances
919    /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_left", &cmd).await?;
920    /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_right", &cmd).await?;
921    /// # Ok(())
922    /// # }
923    /// ```
924    #[cfg(feature = "messaging")]
925    pub async fn publish_to_instance<T>(&self, topic: Topic<T>, instance: &str, message: &T) -> Result<()>
926    where
927        T: Message + Serialize,
928    {
929        let instance_path = format!("{}/{}", topic.path(), instance);
930        self.publish_raw(&instance_path, message).await
931    }
932
933    /// Subscribe to an instance-scoped topic
934    ///
935    /// This automatically appends the context's instance name to the topic path.
936    /// If no instance is set, this behaves like `subscribe()`.
937    ///
938    /// # Example
939    ///
940    /// ```rust
941    /// use mecha10::prelude::*;
942    /// use mecha10::topics::sensor;
943    ///
944    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
945    /// let ctx = Context::new("vision_node").await?;
946    ///
947    /// // Subscribe to left camera (topic: "/sensor/camera/rgb/left")
948    /// let mut left_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "left").await?;
949    ///
950    /// // Subscribe to right camera (topic: "/sensor/camera/rgb/right")
951    /// let mut right_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "right").await?;
952    /// # Ok(())
953    /// # }
954    /// ```
955    #[cfg(feature = "messaging")]
956    pub async fn subscribe_scoped<T>(&self, topic: Topic<T>, instance: &str) -> Result<Receiver<T>>
957    where
958        T: Message + DeserializeOwned + Send + 'static,
959    {
960        let scoped_path = format!("{}/{}", topic.path(), instance);
961        self.subscribe_raw(&scoped_path).await
962    }
963
964    /// Subscribe to a topic using a raw string path (for RPC and advanced use cases)
965    ///
966    /// # Arguments
967    ///
968    /// * `topic_path` - Raw topic path string
969    ///
970    /// # Returns
971    ///
972    /// A receiver that will yield messages of type `T`
973    ///
974    /// # Note
975    ///
976    /// This is a low-level method. Prefer using `subscribe()` with type-safe topics
977    /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
978    #[cfg(feature = "messaging")]
979    pub async fn subscribe_raw<T>(&self, topic_path: &str) -> Result<Receiver<T>>
980    where
981        T: Message + DeserializeOwned + Send + 'static,
982    {
983        let mut bus = self.message_bus.write().await;
984
985        // Create a unique consumer group for this node
986        let consumer_group = format!("{}-{}", self.node_id, topic_path.replace('/', "-"));
987
988        let subscriber = bus
989            .subscribe::<T>(topic_path, &consumer_group)
990            .await
991            .with_context(|| {
992                format!(
993                    "Failed to subscribe to raw topic '{}' with consumer group '{}' from node '{}'",
994                    topic_path, consumer_group, self.node_id
995                )
996            })
997            .map_err(|e| Mecha10Error::MessagingError {
998                message: format!("{:#}", e),
999                suggestion: format!("Check that Redis is running and topic '{}' is valid", topic_path),
1000            })?;
1001
1002        // Convert to our Receiver type
1003        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1004
1005        // Spawn task to forward messages
1006        tokio::spawn(async move {
1007            let mut sub = subscriber;
1008            while let Some(msg) = sub.recv().await {
1009                // Auto-acknowledge message before moving payload
1010                if let Err(e) = msg.ack().await {
1011                    tracing::warn!(
1012                        topic = %msg.topic,
1013                        message_id = %msg.id,
1014                        error = %e,
1015                        "Failed to acknowledge message (will be redelivered on reconnect)"
1016                    );
1017                }
1018                // Extract payload and forward
1019                if tx.send(msg.payload).is_err() {
1020                    break; // Receiver dropped
1021                }
1022            }
1023        });
1024
1025        Ok(Receiver::from(rx))
1026    }
1027
1028    /// Discover topics matching a glob pattern
1029    ///
1030    /// Queries Redis for all topics matching the given pattern without subscribing.
1031    /// Useful for inspecting available topics before subscribing.
1032    ///
1033    /// # Pattern Syntax
1034    ///
1035    /// - `*` matches any sequence of characters within a path segment
1036    /// - `**` matches any sequence of path segments
1037    /// - `{a,b}` matches either a or b (future enhancement)
1038    ///
1039    /// # Arguments
1040    ///
1041    /// * `pattern` - Glob pattern to match topics against
1042    ///
1043    /// # Returns
1044    ///
1045    /// A vector of topic paths that match the pattern
1046    ///
1047    /// # Example
1048    ///
1049    /// ```rust,no_run
1050    /// use mecha10::prelude::*;
1051    ///
1052    /// # async fn example(ctx: &Context) -> Result<()> {
1053    /// // Discover all camera topics
1054    /// let topics = ctx.discover_topics("/sensor/camera/*").await?;
1055    /// println!("Found {} camera topics: {:?}", topics.len(), topics);
1056    /// # Ok(())
1057    /// # }
1058    /// ```
1059    #[cfg(feature = "messaging")]
1060    pub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>> {
1061        use crate::aggregated::pattern::convert_glob_to_redis;
1062
1063        let bus = self.message_bus.read().await;
1064        let redis_pattern = convert_glob_to_redis(pattern);
1065
1066        bus.discover_topics(&redis_pattern)
1067            .await
1068            .with_context(|| {
1069                format!(
1070                    "Failed to discover topics matching pattern '{}' (redis pattern: '{}') from node '{}'",
1071                    pattern, redis_pattern, self.node_id
1072                )
1073            })
1074            .map_err(|e| Mecha10Error::MessagingError {
1075                message: format!("{:#}", e),
1076                suggestion: "Check that Redis is running and the pattern is valid".to_string(),
1077            })
1078    }
1079
1080    /// Subscribe to multiple topics matching a glob pattern
1081    ///
1082    /// Returns an aggregated receiver that multiplexes all matching topics into a single stream.
1083    /// Each message is tagged with its source topic path.
1084    ///
1085    /// # Pattern Syntax
1086    ///
1087    /// - `*` matches any sequence of characters within a path segment
1088    /// - `**` matches any sequence of path segments
1089    /// - `{a,b}` matches either a or b (future enhancement)
1090    ///
1091    /// # Arguments
1092    ///
1093    /// * `pattern` - Glob pattern to match topics against
1094    ///
1095    /// # Returns
1096    ///
1097    /// An `AggregatedReceiver<T>` that yields `(topic_path, message)` tuples
1098    ///
1099    /// # Examples
1100    ///
1101    /// ```rust,no_run
1102    /// use mecha10::prelude::*;
1103    ///
1104    /// # async fn example(ctx: &Context) -> Result<()> {
1105    /// // Subscribe to all cameras with one call
1106    /// let mut cameras = ctx.subscribe_aggregated::<ImageMessage>("/sensor/camera/*").await?;
1107    ///
1108    /// while let Some((topic, image)) = cameras.recv().await {
1109    ///     let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
1110    ///     println!("Camera {}: {}x{}", camera_name, image.width, image.height);
1111    /// }
1112    /// # Ok(())
1113    /// # }
1114    /// ```
1115    #[cfg(feature = "messaging")]
1116    pub async fn subscribe_aggregated<T>(&self, pattern: &str) -> Result<crate::aggregated::AggregatedReceiver<T>>
1117    where
1118        T: Message + DeserializeOwned + Send + 'static,
1119    {
1120        // 1. Discover all topics matching pattern
1121        let matching_topics = self.discover_topics(pattern).await?;
1122
1123        if matching_topics.is_empty() {
1124            return Err(Mecha10Error::MessagingError {
1125                message: format!("No topics found matching pattern: {}", pattern),
1126                suggestion: "Check that the pattern is correct and topics exist".to_string(),
1127            });
1128        }
1129
1130        // 2. Subscribe to each topic
1131        let mut receivers = Vec::new();
1132        for topic_path in matching_topics {
1133            let rx = self.subscribe_raw::<T>(&topic_path).await?;
1134            // Extract receiver and wrap in ReceiverType enum
1135            let receiver_type = match rx.inner {
1136                crate::context::receiver::ReceiverInner::Unbounded(unbounded) => {
1137                    crate::aggregated::ReceiverType::Unbounded(unbounded)
1138                }
1139                crate::context::receiver::ReceiverInner::Bounded(bounded) => {
1140                    crate::aggregated::ReceiverType::Bounded(bounded)
1141                }
1142            };
1143            receivers.push((topic_path.clone(), receiver_type));
1144        }
1145
1146        // 3. Return aggregated receiver with support for both channel types
1147        Ok(crate::aggregated::AggregatedReceiver::new_mixed(
1148            pattern.to_string(),
1149            receivers,
1150        ))
1151    }
1152
1153    /// Publish a message to a topic using a raw string path (for RPC and advanced use cases)
1154    ///
1155    /// # Arguments
1156    ///
1157    /// * `topic_path` - Raw topic path string
1158    /// * `message` - Message to publish
1159    ///
1160    /// # Note
1161    ///
1162    /// This is a low-level method. Prefer using `publish_to()` with type-safe topics
1163    /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
1164    #[cfg(feature = "messaging")]
1165    pub async fn publish_raw<T>(&self, topic_path: &str, message: &T) -> Result<()>
1166    where
1167        T: Message + Serialize,
1168    {
1169        let mut bus = self.message_bus.write().await;
1170
1171        bus.publish(topic_path, message)
1172            .await
1173            .with_context(|| {
1174                format!(
1175                    "Failed to publish to raw topic '{}' from node '{}'",
1176                    topic_path, self.node_id
1177                )
1178            })
1179            .map_err(|e| Mecha10Error::MessagingError {
1180                message: format!("{:#}", e),
1181                suggestion: "Check that Redis is running and accepting connections".to_string(),
1182            })?;
1183
1184        Ok(())
1185    }
1186
1187    /// Check if shutdown has been requested
1188    pub async fn is_shutdown(&self) -> bool {
1189        *self.shutdown.read().await
1190    }
1191
1192    /// Request shutdown
1193    pub async fn shutdown(&self) {
1194        *self.shutdown.write().await = true;
1195
1196        // Stop Redis bridge if running
1197        #[cfg(feature = "messaging")]
1198        {
1199            let bridge_opt = self.redis_bridge.read().await;
1200            if let Some(bridge) = bridge_opt.as_ref() {
1201                if let Err(e) = bridge.stop().await {
1202                    tracing::warn!("Failed to stop Redis bridge during shutdown: {}", e);
1203                }
1204            }
1205        }
1206    }
1207
1208    /// Wait for shutdown signal
1209    pub async fn wait_for_shutdown(&self) {
1210        loop {
1211            if self.is_shutdown().await {
1212                break;
1213            }
1214            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1215        }
1216    }
1217
1218    /// Enter a logging span with this node's context
1219    ///
1220    /// This returns a guard that, when entered, adds the node_id to all log messages.
1221    /// The span is automatically exited when the guard is dropped.
1222    ///
1223    /// # Example
1224    ///
1225    /// ```rust
1226    /// use mecha10::prelude::*;
1227    ///
1228    /// # async fn example(ctx: &Context) -> Result<()> {
1229    /// let _guard = ctx.logging_span().entered();
1230    /// info!("This message includes node_id automatically");
1231    /// debug!("So does this one");
1232    /// # Ok(())
1233    /// # }
1234    /// ```
1235    pub fn logging_span(&self) -> tracing::Span {
1236        tracing::info_span!("node", node_id = %self.node_id)
1237    }
1238
1239    /// Subscribe to all camera topics
1240    ///
1241    /// Convenience method for `subscribe_aggregated::<Image>("/sensor/camera/*")`
1242    ///
1243    /// # Example
1244    ///
1245    /// ```rust,no_run
1246    /// use mecha10::prelude::*;
1247    ///
1248    /// # async fn example(ctx: &Context) -> Result<()> {
1249    /// let mut cameras = ctx.subscribe_all_cameras().await?;
1250    /// while let Some((topic, image)) = cameras.recv().await {
1251    ///     let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
1252    ///     println!("Camera {}: {}x{}", camera_name, image.width, image.height);
1253    /// }
1254    /// # Ok(())
1255    /// # }
1256    /// ```
1257    #[cfg(feature = "messaging")]
1258    pub async fn subscribe_all_cameras(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::Image>> {
1259        self.subscribe_aggregated("/sensor/camera/*").await
1260    }
1261
1262    /// Subscribe to all sensor topics
1263    ///
1264    /// Subscribes to all sensor data including cameras, LiDAR, IMU, odometry, etc.
1265    ///
1266    /// # Example
1267    ///
1268    /// ```rust,no_run
1269    /// use mecha10::prelude::*;
1270    ///
1271    /// # async fn example(ctx: &Context) -> Result<()> {
1272    /// // Subscribe to all sensors (returns untyped messages)
1273    /// let topics = ctx.discover_topics("/sensor/**").await?;
1274    /// println!("Found {} sensor topics", topics.len());
1275    /// # Ok(())
1276    /// # }
1277    /// ```
1278    #[cfg(feature = "messaging")]
1279    pub async fn discover_all_sensors(&self) -> Result<Vec<String>> {
1280        self.discover_topics("/sensor/**").await
1281    }
1282
1283    /// Subscribe to all LiDAR topics
1284    ///
1285    /// Convenience method for `subscribe_aggregated::<LaserScan>("/sensor/lidar/*")`
1286    #[cfg(feature = "messaging")]
1287    pub async fn subscribe_all_lidars(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::LaserScan>> {
1288        self.subscribe_aggregated("/sensor/lidar/*").await
1289    }
1290
1291    /// Subscribe to all IMU topics
1292    ///
1293    /// Convenience method for `subscribe_aggregated::<IMU>("/sensor/imu/*")`
1294    #[cfg(feature = "messaging")]
1295    pub async fn subscribe_all_imus(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::IMU>> {
1296        self.subscribe_aggregated("/sensor/imu/*").await
1297    }
1298
1299    /// Subscribe to all motor topics
1300    ///
1301    /// Convenience method for `subscribe_aggregated::<MotorStatus>("/actuator/motor/*")`
1302    #[cfg(feature = "messaging")]
1303    pub async fn subscribe_all_motors(
1304        &self,
1305    ) -> Result<crate::aggregated::AggregatedReceiver<crate::actuator::MotorStatus>> {
1306        self.subscribe_aggregated("/actuator/motor/*").await
1307    }
1308
1309    /// Subscribe to a topic pattern with wildcard
1310    ///
1311    /// Generic helper for subscribing to multiple topics matching a pattern.
1312    ///
1313    /// # Example
1314    ///
1315    /// ```rust,no_run
1316    /// use mecha10::prelude::*;
1317    /// use mecha10::topics::sensor;
1318    ///
1319    /// # async fn example(ctx: &Context) -> Result<()> {
1320    /// // Subscribe to all instances of a specific topic type
1321    /// let mut cameras = ctx.subscribe_pattern(sensor::CAMERA_RGB, "*").await?;
1322    /// # Ok(())
1323    /// # }
1324    /// ```
1325    #[cfg(feature = "messaging")]
1326    pub async fn subscribe_pattern<T>(
1327        &self,
1328        topic: crate::topics::Topic<T>,
1329        pattern: &str,
1330    ) -> Result<crate::aggregated::AggregatedReceiver<T>>
1331    where
1332        T: Message + DeserializeOwned + Send + 'static,
1333    {
1334        let full_pattern = format!("{}/{}", topic.path(), pattern);
1335        self.subscribe_aggregated(&full_pattern).await
1336    }
1337
1338    /// Get or create a state manager for this node
1339    ///
1340    /// The state manager provides persistent storage for node state across restarts.
1341    /// By default, it uses filesystem storage in `./state/` directory.
1342    ///
1343    /// # Example
1344    ///
1345    /// ```rust,no_run
1346    /// use mecha10::prelude::*;
1347    /// use serde::{Deserialize, Serialize};
1348    ///
1349    /// #[derive(Debug, Clone, Serialize, Deserialize)]
1350    /// struct MyState {
1351    ///     counter: u64,
1352    ///     position: (f64, f64),
1353    /// }
1354    ///
1355    /// # async fn example(ctx: &Context) -> Result<()> {
1356    /// let state_mgr = ctx.state_manager().await?;
1357    ///
1358    /// // Load previous state
1359    /// let mut state = state_mgr
1360    ///     .load::<MyState>("my_node")
1361    ///     .await?
1362    ///     .unwrap_or(MyState {
1363    ///         counter: 0,
1364    ///         position: (0.0, 0.0),
1365    ///     });
1366    ///
1367    /// // Update state
1368    /// state.counter += 1;
1369    ///
1370    /// // Save state
1371    /// state_mgr.save("my_node", &state).await?;
1372    /// # Ok(())
1373    /// # }
1374    /// ```
1375    pub async fn state_manager(&self) -> Result<Arc<crate::state::ConcreteStateManager>> {
1376        let mut mgr = self.state_manager.write().await;
1377
1378        if let Some(existing) = mgr.as_ref() {
1379            return Ok(Arc::clone(existing));
1380        }
1381
1382        // Create default filesystem-based state manager
1383        let state_dir = std::env::var("MECHA10_STATE_DIR").unwrap_or_else(|_| "./state".to_string());
1384        let fs_mgr = FilesystemStateManager::new(&state_dir).await?;
1385        let arc_mgr = Arc::new(crate::state::ConcreteStateManager::Filesystem(fs_mgr));
1386
1387        *mgr = Some(Arc::clone(&arc_mgr));
1388
1389        Ok(arc_mgr)
1390    }
1391
1392    /// Set a custom state manager for this context
1393    ///
1394    /// This allows using alternative backends (Redis, PostgreSQL, etc.) or custom implementations.
1395    ///
1396    /// # Example
1397    ///
1398    /// ```rust,no_run
1399    /// use mecha10::prelude::*;
1400    /// use std::sync::Arc;
1401    ///
1402    /// # async fn example(ctx: &Context) -> Result<()> {
1403    /// // Use in-memory state manager for testing
1404    /// let mem_mgr = Arc::new(MemoryStateManager::new());
1405    /// ctx.set_state_manager(mem_mgr).await;
1406    /// # Ok(())
1407    /// # }
1408    /// ```
1409    pub async fn set_state_manager(&self, manager: Arc<crate::state::ConcreteStateManager>) {
1410        let mut mgr = self.state_manager.write().await;
1411        *mgr = Some(manager);
1412    }
1413
1414    // ========================================
1415    // Topic Discovery Helpers
1416    // ========================================
1417
1418    /// List all sensor topics currently active in the system
1419    ///
1420    /// Returns all topics under `/sensor/*` including cameras, LiDAR, IMU, GPS, etc.
1421    ///
1422    /// # Example
1423    ///
1424    /// ```rust,no_run
1425    /// use mecha10::prelude::*;
1426    ///
1427    /// # async fn example(ctx: &Context) -> Result<()> {
1428    /// let sensors = ctx.list_sensor_topics().await?;
1429    /// println!("Available sensors:");
1430    /// for topic in sensors {
1431    ///     println!("  - {}", topic);
1432    /// }
1433    /// # Ok(())
1434    /// # }
1435    /// ```
1436    #[cfg(feature = "messaging")]
1437    pub async fn list_sensor_topics(&self) -> Result<Vec<String>> {
1438        self.discover_topics("/sensor/*").await
1439    }
1440
1441    /// List all actuator topics currently active in the system
1442    ///
1443    /// Returns all topics under `/actuator/*` including motors, servos, grippers, etc.
1444    ///
1445    /// # Example
1446    ///
1447    /// ```rust,no_run
1448    /// use mecha10::prelude::*;
1449    ///
1450    /// # async fn example(ctx: &Context) -> Result<()> {
1451    /// let actuators = ctx.list_actuator_topics().await?;
1452    /// println!("Available actuators:");
1453    /// for topic in actuators {
1454    ///     println!("  - {}", topic);
1455    /// }
1456    /// # Ok(())
1457    /// # }
1458    /// ```
1459    #[cfg(feature = "messaging")]
1460    pub async fn list_actuator_topics(&self) -> Result<Vec<String>> {
1461        self.discover_topics("/actuator/*").await
1462    }
1463
1464    /// List all topics in the system
1465    ///
1466    /// Returns all active topics regardless of category.
1467    ///
1468    /// # Example
1469    ///
1470    /// ```rust,no_run
1471    /// use mecha10::prelude::*;
1472    ///
1473    /// # async fn example(ctx: &Context) -> Result<()> {
1474    /// let all_topics = ctx.list_all_topics().await?;
1475    /// println!("All active topics ({} total):", all_topics.len());
1476    /// for topic in all_topics {
1477    ///     println!("  - {}", topic);
1478    /// }
1479    /// # Ok(())
1480    /// # }
1481    /// ```
1482    #[cfg(feature = "messaging")]
1483    pub async fn list_all_topics(&self) -> Result<Vec<String>> {
1484        self.discover_topics("*").await
1485    }
1486
1487    /// List all camera topics
1488    ///
1489    /// Returns all topics under `/sensor/camera/*`.
1490    ///
1491    /// # Example
1492    ///
1493    /// ```rust,no_run
1494    /// use mecha10::prelude::*;
1495    ///
1496    /// # async fn example(ctx: &Context) -> Result<()> {
1497    /// let cameras = ctx.list_camera_topics().await?;
1498    /// println!("Available cameras:");
1499    /// for topic in cameras {
1500    ///     println!("  - {}", topic);
1501    /// }
1502    /// # Ok(())
1503    /// # }
1504    /// ```
1505    #[cfg(feature = "messaging")]
1506    pub async fn list_camera_topics(&self) -> Result<Vec<String>> {
1507        self.discover_topics("/sensor/camera/*").await
1508    }
1509
1510    /// List all LiDAR topics
1511    ///
1512    /// Returns all topics under `/sensor/lidar/*`.
1513    #[cfg(feature = "messaging")]
1514    pub async fn list_lidar_topics(&self) -> Result<Vec<String>> {
1515        self.discover_topics("/sensor/lidar/*").await
1516    }
1517
1518    /// List all IMU topics
1519    ///
1520    /// Returns all topics under `/sensor/imu/*`.
1521    #[cfg(feature = "messaging")]
1522    pub async fn list_imu_topics(&self) -> Result<Vec<String>> {
1523        self.discover_topics("/sensor/imu/*").await
1524    }
1525
1526    /// List all motor topics
1527    ///
1528    /// Returns all topics under `/actuator/motor/*`.
1529    #[cfg(feature = "messaging")]
1530    pub async fn list_motor_topics(&self) -> Result<Vec<String>> {
1531        self.discover_topics("/actuator/motor/*").await
1532    }
1533
1534    /// Print a formatted summary of all active topics
1535    ///
1536    /// Useful for debugging and introspection. Groups topics by category
1537    /// and displays them in a human-readable format.
1538    ///
1539    /// # Example
1540    ///
1541    /// ```rust,no_run
1542    /// use mecha10::prelude::*;
1543    ///
1544    /// # async fn example(ctx: &Context) -> Result<()> {
1545    /// ctx.print_topic_summary().await?;
1546    /// // Output:
1547    /// // === Active Topics ===
1548    /// // Sensors (3):
1549    /// //   - /sensor/camera/front
1550    /// //   - /sensor/lidar/main
1551    /// //   - /sensor/imu/base
1552    /// // Actuators (2):
1553    /// //   - /actuator/motor/left
1554    /// //   - /actuator/motor/right
1555    /// // =====================
1556    /// # Ok(())
1557    /// # }
1558    /// ```
1559    #[cfg(feature = "messaging")]
1560    pub async fn print_topic_summary(&self) -> Result<()> {
1561        let sensors = self.list_sensor_topics().await.unwrap_or_default();
1562        let actuators = self.list_actuator_topics().await.unwrap_or_default();
1563        let all_topics = self.list_all_topics().await?;
1564
1565        let other_topics: Vec<String> = all_topics
1566            .into_iter()
1567            .filter(|t| !t.starts_with("/sensor/") && !t.starts_with("/actuator/"))
1568            .collect();
1569
1570        println!("=== Active Topics ===");
1571
1572        if !sensors.is_empty() {
1573            println!("Sensors ({}):", sensors.len());
1574            for topic in sensors {
1575                println!("  - {}", topic);
1576            }
1577        }
1578
1579        if !actuators.is_empty() {
1580            println!("Actuators ({}):", actuators.len());
1581            for topic in actuators {
1582                println!("  - {}", topic);
1583            }
1584        }
1585
1586        if !other_topics.is_empty() {
1587            println!("Other ({}):", other_topics.len());
1588            for topic in other_topics {
1589                println!("  - {}", topic);
1590            }
1591        }
1592
1593        println!("=====================");
1594
1595        Ok(())
1596    }
1597}