mecha10_core/context/mod.rs
1//! Node execution context
2//!
3//! Provides a high-level API for nodes to interact with the framework,
4//! including type-safe pub/sub, configuration access, and lifecycle management.
5//!
6//! This module has been refactored into smaller sub-modules for better organization:
7//! - `infra_config`: Infrastructure configuration types
8//! - `receiver`: Message receiver wrapper
9
10mod builder;
11mod infra_config;
12mod receiver;
13
14pub use builder::ContextBuilder;
15pub use infra_config::{BridgeConfig, InfrastructureConfig, MinioConfig, MlflowConfig, PostgresConfig, RedisConfig};
16pub use receiver::Receiver;
17
18use crate::error::{Mecha10Error, Result};
19use crate::messages::Message;
20use crate::state::FilesystemStateManager;
21use crate::topics::Topic;
22use anyhow::Context as _;
23use serde::{de::DeserializeOwned, Serialize};
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27// Import the complete Context struct and all implementations from the backup file
28// This maintains backward compatibility while improving code organization
29
30pub struct Context {
31 /// Node identifier (includes instance if set: "node_name" or "node_name/instance")
32 node_id: String,
33
34 /// Optional instance name
35 instance: Option<String>,
36
37 /// Local message bus for on-robot communication (wrapped in Arc for sharing across tasks)
38 #[cfg(feature = "messaging")]
39 message_bus: Arc<RwLock<mecha10_messaging::MessageBus>>,
40
41 /// Control plane message bus for robot-to-cloud communication (optional)
42 #[cfg(feature = "messaging")]
43 control_plane_bus: Arc<RwLock<Option<mecha10_messaging::MessageBus>>>,
44
45 /// Redis bridge for forwarding messages between local and control plane
46 #[cfg(feature = "messaging")]
47 redis_bridge: Arc<RwLock<Option<crate::redis_bridge::RedisBridge>>>,
48
49 /// Shutdown signal
50 shutdown: Arc<RwLock<bool>>,
51
52 /// State manager for persistent node state (lazy initialized)
53 state_manager: Arc<RwLock<Option<Arc<crate::state::ConcreteStateManager>>>>,
54}
55impl Clone for Context {
56 fn clone(&self) -> Self {
57 Self {
58 node_id: self.node_id.clone(),
59 instance: self.instance.clone(),
60 #[cfg(feature = "messaging")]
61 message_bus: Arc::clone(&self.message_bus),
62 #[cfg(feature = "messaging")]
63 control_plane_bus: Arc::clone(&self.control_plane_bus),
64 #[cfg(feature = "messaging")]
65 redis_bridge: Arc::clone(&self.redis_bridge),
66 shutdown: Arc::clone(&self.shutdown),
67 state_manager: Arc::clone(&self.state_manager),
68 }
69 }
70}
71impl Context {
72 /// Create a new context
73 ///
74 /// # Arguments
75 ///
76 /// * `node_id` - Unique identifier for this node (fallback if NODE_NAME env var not set)
77 ///
78 /// # Environment Variables
79 ///
80 /// * `NODE_NAME` - If set, overrides the `node_id` parameter. This allows node-runner
81 /// to set full identifiers like `@mecha10/speaker` for consistent naming across
82 /// the CLI, dashboard, and health reporting.
83 ///
84 /// # Example
85 ///
86 /// ```rust
87 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
88 /// let ctx = Context::new("my-node").await?;
89 /// # Ok(())
90 /// # }
91 /// ```
92 pub async fn new(node_id: &str) -> Result<Self> {
93 // Use NODE_NAME from environment if set (for consistent naming with node-runner)
94 // This allows bundled nodes to report health with full identifiers like @mecha10/speaker
95 let node_id = std::env::var("NODE_NAME").unwrap_or_else(|_| node_id.to_string());
96 #[cfg(feature = "messaging")]
97 let (message_bus, control_plane_bus, redis_bridge) = {
98 // Load infrastructure config
99 let config = Self::load_infrastructure_config()?;
100
101 // Connect to local Redis
102 let redis_url = Self::get_redis_url_from_config(&config)?;
103 let local_bus = mecha10_messaging::MessageBus::connect(&redis_url, &node_id)
104 .await
105 .with_context(|| {
106 format!(
107 "Failed to connect local message bus for node '{}' to Redis at '{}'",
108 node_id, redis_url
109 )
110 })
111 .map_err(|e| Mecha10Error::MessagingError {
112 message: format!("{:#}", e),
113 suggestion: "Ensure local Redis is running at the configured URL".to_string(),
114 })?;
115
116 // Connect to control plane Redis if configured
117 let cp_bus = if let Some(cp_config) = &config.control_plane_redis {
118 let cp_url = &cp_config.url;
119 match mecha10_messaging::MessageBus::connect(cp_url, &node_id).await {
120 Ok(bus) => {
121 tracing::info!(
122 "Connected to control plane Redis at '{}' for node '{}'",
123 cp_url,
124 node_id
125 );
126 Some(bus)
127 }
128 Err(e) => {
129 tracing::warn!(
130 "Failed to connect to control plane Redis at '{}': {}. Continuing with local Redis only.",
131 cp_url,
132 e
133 );
134 None
135 }
136 }
137 } else {
138 None
139 };
140
141 // Initialize Redis bridge if both connections are available
142 let bridge = if let Some(_cp_bus_ref) = &cp_bus {
143 // Get bridge configuration (use default if not specified)
144 let bridge_config = config.bridge.unwrap_or_default();
145
146 if bridge_config.enabled {
147 // Create bridge with separate clones of the buses
148 let bridge_local_bus = mecha10_messaging::MessageBus::connect(&redis_url, &node_id)
149 .await
150 .map_err(|e| Mecha10Error::MessagingError {
151 message: format!("Failed to create bridge connection to local Redis: {}", e),
152 suggestion: "Check local Redis connection".to_string(),
153 })?;
154
155 let cp_url = config
156 .control_plane_redis
157 .as_ref()
158 .map(|c| c.url.clone())
159 .unwrap_or_default();
160 let bridge_cp_bus = mecha10_messaging::MessageBus::connect(&cp_url, &node_id)
161 .await
162 .map_err(|e| Mecha10Error::MessagingError {
163 message: format!("Failed to create bridge connection to control plane Redis: {}", e),
164 suggestion: "Check control plane Redis connection".to_string(),
165 })?;
166
167 // Convert from infra_config::BridgeConfig to redis_bridge::BridgeConfig
168 let redis_bridge_config = crate::redis_bridge::BridgeConfig {
169 enabled: bridge_config.enabled,
170 local_topics: bridge_config.local_topics,
171 bridged_topics: bridge_config.bridged_topics,
172 robot_id: bridge_config.robot_id,
173 };
174
175 let bridge =
176 crate::redis_bridge::RedisBridge::new(redis_bridge_config, bridge_local_bus, bridge_cp_bus);
177
178 // Start the bridge
179 if let Err(e) = bridge.start().await {
180 tracing::warn!("Failed to start Redis bridge: {}. Continuing without bridge.", e);
181 None
182 } else {
183 tracing::info!("Redis bridge started successfully");
184 Some(bridge)
185 }
186 } else {
187 tracing::debug!("Redis bridge is disabled in configuration");
188 None
189 }
190 } else {
191 None
192 };
193
194 (local_bus, cp_bus, bridge)
195 };
196
197 Ok(Self {
198 node_id,
199 instance: None,
200 #[cfg(feature = "messaging")]
201 message_bus: Arc::new(RwLock::new(message_bus)),
202 #[cfg(feature = "messaging")]
203 control_plane_bus: Arc::new(RwLock::new(control_plane_bus)),
204 #[cfg(feature = "messaging")]
205 redis_bridge: Arc::new(RwLock::new(redis_bridge)),
206 shutdown: Arc::new(RwLock::new(false)),
207 state_manager: Arc::new(RwLock::new(None)),
208 })
209 }
210
211 /// Set an instance name for this context (builder pattern)
212 ///
213 /// This allows the same node implementation to run multiple instances
214 /// with different names. The node_id becomes "{node_name}/{instance}".
215 ///
216 /// # Example
217 ///
218 /// ```rust
219 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
220 /// // Single instance (no instance name)
221 /// let ctx = Context::new("camera_fake").await?;
222 /// // node_id: "camera_fake"
223 ///
224 /// // Multi-instance (with instance names)
225 /// let left_ctx = Context::new("camera_fake").await?.with_instance("left");
226 /// // node_id: "camera_fake/left"
227 ///
228 /// let right_ctx = Context::new("camera_fake").await?.with_instance("right");
229 /// // node_id: "camera_fake/right"
230 /// # Ok(())
231 /// # }
232 /// ```
233 pub fn with_instance(mut self, instance: &str) -> Self {
234 self.instance = Some(instance.to_string());
235 self.node_id = format!("{}/{}", self.node_id, instance);
236 self
237 }
238
239 /// Get the instance name if set
240 ///
241 /// Returns `None` for single-instance nodes, or `Some(instance)` for
242 /// multi-instance nodes.
243 pub fn instance(&self) -> Option<&str> {
244 self.instance.as_deref()
245 }
246
247 /// Load infrastructure configuration based on environment
248 ///
249 /// Priority order:
250 /// 1. mecha10.json (project configuration) - redis.url field
251 /// 2. Environment variables (MECHA10_REDIS_URL or REDIS_URL)
252 /// 3. config/infrastructure.<environment>.yaml
253 ///
254 /// Set via MECHA10_ENVIRONMENT environment variable.
255 fn load_infrastructure_config() -> Result<InfrastructureConfig> {
256 use crate::config::load_config;
257 use std::env;
258 use std::path::PathBuf;
259
260 // 1. Try loading from mecha10.json FIRST (project configuration)
261 if let Ok(redis_url) = Self::try_load_redis_from_mecha10_json() {
262 return Ok(InfrastructureConfig {
263 redis: crate::context::infra_config::RedisConfig {
264 url: redis_url,
265 max_connections: 10,
266 connection_timeout_ms: 5000,
267 },
268 control_plane_redis: None,
269 bridge: None,
270 postgres: None,
271 mlflow: None,
272 minio: None,
273 });
274 }
275
276 // 2. Check for environment variable overrides
277 // If REDIS_URL or MECHA10_REDIS_URL is set, use that and create minimal config
278 if let Ok(redis_url) = env::var("MECHA10_REDIS_URL").or_else(|_| env::var("REDIS_URL")) {
279 return Ok(InfrastructureConfig {
280 redis: crate::context::infra_config::RedisConfig {
281 url: redis_url,
282 max_connections: 10,
283 connection_timeout_ms: 5000,
284 },
285 control_plane_redis: None,
286 bridge: None,
287 postgres: None,
288 mlflow: None,
289 minio: None,
290 });
291 }
292
293 // 3. Load from infrastructure.yaml
294 let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
295
296 let config_filename = format!("infrastructure.{}.yaml", environment);
297
298 // Try multiple paths in order:
299 // 1. Current directory (for running from workspace root)
300 // 2. Workspace root (for running tests from target/debug/deps)
301 // 3. Two levels up (for running tests from packages/core/target/debug/deps)
302 let paths_to_try = vec![
303 PathBuf::from(format!("config/{}", config_filename)),
304 PathBuf::from(format!("../../config/{}", config_filename)),
305 PathBuf::from(format!("../../../../config/{}", config_filename)),
306 ];
307
308 for path in &paths_to_try {
309 if path.exists() {
310 return load_config(path)
311 .with_context(|| {
312 format!(
313 "Failed to load infrastructure config for environment '{}' from '{}'",
314 environment,
315 path.display()
316 )
317 })
318 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
319 }
320 }
321
322 // If no config file found, return error with helpful message
323 Err(Mecha10Error::Configuration(format!(
324 "Failed to load infrastructure config for environment '{}'. \
325 Tried paths: {}. \
326 Make sure the file exists, set MECHA10_REDIS_URL environment variable, \
327 or configure redis.url in mecha10.json.",
328 environment,
329 paths_to_try
330 .iter()
331 .map(|p| p.display().to_string())
332 .collect::<Vec<_>>()
333 .join(", ")
334 )))
335 }
336
337 /// Try to load Redis URL from mecha10.json
338 ///
339 /// Looks for mecha10.json in current directory and reads redis.url field.
340 fn try_load_redis_from_mecha10_json() -> Result<String> {
341 use std::path::PathBuf;
342
343 // Try multiple paths to find mecha10.json
344 let paths_to_try = vec![
345 PathBuf::from("mecha10.json"),
346 PathBuf::from("../mecha10.json"),
347 PathBuf::from("../../mecha10.json"),
348 ];
349
350 for path in &paths_to_try {
351 if !path.exists() {
352 continue;
353 }
354
355 // Read and parse mecha10.json
356 let content = std::fs::read_to_string(path)
357 .map_err(|e| Mecha10Error::Configuration(format!("Failed to read {}: {}", path.display(), e)))?;
358
359 let json: serde_json::Value = serde_json::from_str(&content)
360 .map_err(|e| Mecha10Error::Configuration(format!("Failed to parse {}: {}", path.display(), e)))?;
361
362 // Extract redis.url
363 if let Some(redis_url) = json.get("redis").and_then(|r| r.get("url")).and_then(|u| u.as_str()) {
364 return Ok(redis_url.to_string());
365 }
366 }
367
368 Err(Mecha10Error::Configuration(
369 "No mecha10.json found with redis.url configuration".to_string(),
370 ))
371 }
372
373 /// Get Redis URL from infrastructure configuration (helper for internal use)
374 fn get_redis_url_from_config(config: &InfrastructureConfig) -> Result<String> {
375 // First try environment variables (for backward compatibility and testing)
376 if let Ok(url) = std::env::var("MECHA10_REDIS_URL") {
377 return Ok(url);
378 }
379 if let Ok(url) = std::env::var("REDIS_URL") {
380 return Ok(url);
381 }
382
383 // Use config
384 Ok(config.redis.url.clone())
385 }
386
387 /// Get Redis URL from infrastructure configuration
388 pub fn get_redis_url() -> Result<String> {
389 let config = Self::load_infrastructure_config()?;
390 Self::get_redis_url_from_config(&config)
391 }
392
393 /// Get the node ID
394 pub fn node_id(&self) -> &str {
395 &self.node_id
396 }
397
398 /// Load node configuration with environment-aware file-level fallback
399 ///
400 /// This method implements the V2 configuration system with:
401 /// - Environment-specific configs (dev/staging/production)
402 /// - File-level fallback (no key-by-key merging)
403 /// - Automatic path resolution
404 ///
405 /// **Fallback Priority:**
406 /// 1. `configs/{env}/nodes/{node-name}/config.json` (environment-specific)
407 /// 2. `configs/common/nodes/{node-name}/config.json` (common fallback)
408 /// 3. `T::default()` (code default)
409 ///
410 /// **Environment Detection:**
411 /// - Uses `MECHA10_ENVIRONMENT` env var (default: "dev")
412 /// - Supported values: "dev", "staging", "production", or custom
413 ///
414 /// # Type Parameters
415 ///
416 /// * `T` - Configuration type that implements `DeserializeOwned` + `Default`
417 ///
418 /// # Arguments
419 ///
420 /// * `node_name` - Name of the node (e.g., "simulation-bridge", "camera-streamer")
421 ///
422 /// # Returns
423 ///
424 /// The loaded configuration of type `T`, using the first available source
425 /// in the fallback chain.
426 ///
427 /// # Fallback Chain
428 ///
429 /// Configs are searched in order:
430 /// 1. `configs/{env}/nodes/{node_name}/config.json` - Environment-specific
431 /// 2. `configs/common/nodes/{node_name}/config.json` - Common config
432 /// 3. `configs/nodes/@mecha10/{node_name}/config.json` - Framework nodes
433 /// 4. `configs/nodes/{node_name}/config.json` - Project nodes (preferred)
434 /// 5. `configs/nodes/@local/{node_name}/config.json` - Legacy format
435 /// 6. `T::default()` - Default implementation
436 ///
437 /// # Example
438 ///
439 /// ```rust
440 /// use mecha10::prelude::*;
441 /// use serde::{Deserialize, Serialize};
442 ///
443 /// #[derive(Debug, Clone, Deserialize, Serialize, Default)]
444 /// struct SimBridgeConfig {
445 /// camera_width: u32,
446 /// camera_height: u32,
447 /// camera_fps: u32,
448 /// }
449 ///
450 /// # async fn example(ctx: &Context) -> Result<()> {
451 /// let config: SimBridgeConfig = ctx.load_node_config("simulation-bridge").await?;
452 /// # Ok(())
453 /// # }
454 /// ```
455 pub async fn load_node_config<T>(&self, node_name: &str) -> Result<T>
456 where
457 T: DeserializeOwned + Default,
458 {
459 use crate::config::load_config;
460 use std::env;
461 use std::path::PathBuf;
462
463 // Get environment from env var (default: "dev")
464 let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "dev".to_string());
465
466 // Try multiple base paths (for running from different locations)
467 let base_paths = vec![
468 PathBuf::from("."), // Current directory
469 PathBuf::from("../.."), // From target/debug/deps
470 PathBuf::from("../../../.."), // From packages/*/target/debug/deps
471 ];
472
473 // Build fallback chain for each base path
474 for base_path in &base_paths {
475 // 1. Try environment-specific config
476 let env_config_path = base_path
477 .join("configs")
478 .join(&environment)
479 .join("nodes")
480 .join(node_name)
481 .join("config.json");
482
483 if env_config_path.exists() {
484 tracing::debug!(
485 "Loading node config for '{}' from environment-specific path: {}",
486 node_name,
487 env_config_path.display()
488 );
489 return load_config(&env_config_path)
490 .with_context(|| {
491 format!(
492 "Failed to load node config for '{}' from '{}'",
493 node_name,
494 env_config_path.display()
495 )
496 })
497 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
498 }
499
500 // 2. Try common config
501 let common_config_path = base_path
502 .join("configs")
503 .join("common")
504 .join("nodes")
505 .join(node_name)
506 .join("config.json");
507
508 if common_config_path.exists() {
509 tracing::debug!(
510 "Loading node config for '{}' from common path: {}",
511 node_name,
512 common_config_path.display()
513 );
514 return load_config(&common_config_path)
515 .with_context(|| {
516 format!(
517 "Failed to load node config for '{}' from '{}'",
518 node_name,
519 common_config_path.display()
520 )
521 })
522 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
523 }
524
525 // 3. Try new flat structure: configs/nodes/@mecha10/{node_name}/config.json
526 // This format supports environment-aware configs: { "dev": {...}, "production": {...} }
527 let flat_config_path = base_path
528 .join("configs")
529 .join("nodes")
530 .join("@mecha10")
531 .join(node_name)
532 .join("config.json");
533
534 if flat_config_path.exists() {
535 tracing::debug!(
536 "Loading node config for '{}' from flat path: {}",
537 node_name,
538 flat_config_path.display()
539 );
540 return Self::load_environment_aware_config(&flat_config_path, &environment)
541 .with_context(|| {
542 format!(
543 "Failed to load node config for '{}' from '{}'",
544 node_name,
545 flat_config_path.display()
546 )
547 })
548 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
549 }
550
551 // 4. Try plain project nodes: configs/nodes/{node_name}/config.json
552 // This is the preferred format for project-local nodes
553 let project_config_path = base_path
554 .join("configs")
555 .join("nodes")
556 .join(node_name)
557 .join("config.json");
558
559 if project_config_path.exists() {
560 tracing::debug!(
561 "Loading node config for '{}' from project path: {}",
562 node_name,
563 project_config_path.display()
564 );
565 return Self::load_environment_aware_config(&project_config_path, &environment)
566 .with_context(|| {
567 format!(
568 "Failed to load node config for '{}' from '{}'",
569 node_name,
570 project_config_path.display()
571 )
572 })
573 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
574 }
575
576 // 5. Try @local nodes (legacy): configs/nodes/@local/{node_name}/config.json
577 let local_config_path = base_path
578 .join("configs")
579 .join("nodes")
580 .join("@local")
581 .join(node_name)
582 .join("config.json");
583
584 if local_config_path.exists() {
585 tracing::debug!(
586 "Loading node config for '{}' from legacy local path: {}",
587 node_name,
588 local_config_path.display()
589 );
590 return Self::load_environment_aware_config(&local_config_path, &environment)
591 .with_context(|| {
592 format!(
593 "Failed to load node config for '{}' from '{}'",
594 node_name,
595 local_config_path.display()
596 )
597 })
598 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
599 }
600 }
601
602 // 3. Fall back to default
603 tracing::debug!(
604 "No config file found for node '{}', using Default implementation",
605 node_name
606 );
607 Ok(T::default())
608 }
609
610 /// Load a config file that may be in environment-aware format.
611 ///
612 /// This handles configs with the format:
613 /// ```json
614 /// {
615 /// "dev": { ... config ... },
616 /// "production": { ... config ... }
617 /// }
618 /// ```
619 ///
620 /// Falls back to direct deserialization for backwards compatibility with
621 /// configs that don't use the environment-aware format.
622 fn load_environment_aware_config<T>(path: &std::path::Path, environment: &str) -> Result<T>
623 where
624 T: DeserializeOwned + Default,
625 {
626 let content =
627 std::fs::read_to_string(path).with_context(|| format!("Failed to read config file: {}", path.display()))?;
628
629 // First, try to parse as a generic JSON value to check structure
630 let json_value: serde_json::Value =
631 serde_json::from_str(&content).with_context(|| format!("Failed to parse JSON from: {}", path.display()))?;
632
633 // Check if this is an environment-aware config (has "dev" or "production" keys)
634 if let Some(obj) = json_value.as_object() {
635 if obj.contains_key("dev") || obj.contains_key("production") {
636 // This is an environment-aware config, extract the right section
637 let env_key = match environment {
638 "production" | "prod" => "production",
639 _ => "dev",
640 };
641
642 if let Some(env_section) = obj.get(env_key) {
643 tracing::debug!("Loading '{}' environment section from: {}", env_key, path.display());
644 return serde_json::from_value(env_section.clone())
645 .with_context(|| {
646 format!("Failed to deserialize '{}' section from: {}", env_key, path.display())
647 })
648 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
649 } else {
650 // Requested environment not found, try the other one
651 let fallback_key = if env_key == "dev" { "production" } else { "dev" };
652 if let Some(fallback_section) = obj.get(fallback_key) {
653 tracing::warn!(
654 "Environment '{}' not found in config, falling back to '{}': {}",
655 env_key,
656 fallback_key,
657 path.display()
658 );
659 return serde_json::from_value(fallback_section.clone())
660 .with_context(|| {
661 format!(
662 "Failed to deserialize '{}' section from: {}",
663 fallback_key,
664 path.display()
665 )
666 })
667 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
668 }
669 }
670 }
671 }
672
673 // Not an environment-aware config, deserialize directly for backwards compatibility
674 tracing::debug!("Config is not environment-aware, loading directly: {}", path.display());
675 serde_json::from_value(json_value)
676 .with_context(|| format!("Failed to deserialize config from: {}", path.display()))
677 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)))
678 }
679
680 /// Check if control plane Redis connection is available
681 #[cfg(feature = "messaging")]
682 pub async fn has_control_plane(&self) -> bool {
683 self.control_plane_bus.read().await.is_some()
684 }
685
686 /// Get control plane Redis URL from infrastructure configuration
687 pub fn get_control_plane_redis_url() -> Result<Option<String>> {
688 let config = Self::load_infrastructure_config()?;
689 Ok(config.control_plane_redis.map(|c| c.url))
690 }
691
692 /// Subscribe to a topic with type-safe message handling
693 ///
694 /// # Arguments
695 ///
696 /// * `topic` - Type-safe topic to subscribe to
697 ///
698 /// # Returns
699 ///
700 /// A receiver that will yield messages of type `T`
701 ///
702 /// # Example
703 ///
704 /// ```rust
705 /// use mecha10::prelude::*;
706 /// use mecha10::topics::sensor;
707 ///
708 /// # async fn example(ctx: &Context) -> Result<()> {
709 /// // Type is inferred as Receiver<Image>
710 /// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
711 ///
712 /// while let Some(image) = images.recv().await {
713 /// println!("Image: {}x{}", image.width, image.height);
714 /// }
715 /// # Ok(())
716 /// # }
717 /// ```
718 #[cfg(feature = "messaging")]
719 pub async fn subscribe<T>(&self, topic: Topic<T>) -> Result<Receiver<T>>
720 where
721 T: Message + DeserializeOwned + Send + 'static,
722 {
723 let mut bus = self.message_bus.write().await;
724
725 // Create a unique consumer group for this node
726 let consumer_group = format!("{}-{}", self.node_id, topic.path().replace('/', "-"));
727
728 let subscriber = bus
729 .subscribe::<T>(topic.path(), &consumer_group)
730 .await
731 .with_context(|| {
732 format!(
733 "Failed to subscribe to topic '{}' with consumer group '{}' from node '{}'",
734 topic.path(),
735 consumer_group,
736 self.node_id
737 )
738 })
739 .map_err(|e| Mecha10Error::MessagingError {
740 message: format!("{:#}", e),
741 suggestion: format!("Check that Redis is running and topic '{}' is valid", topic.path()),
742 })?;
743
744 // Convert to our Receiver type
745 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
746
747 // Spawn task to forward messages
748 tokio::spawn(async move {
749 let mut sub = subscriber;
750 while let Some(msg) = sub.recv().await {
751 // Auto-acknowledge message before moving payload
752 if let Err(e) = msg.ack().await {
753 tracing::warn!(
754 topic = %msg.topic,
755 message_id = %msg.id,
756 error = %e,
757 "Failed to acknowledge message (will be redelivered on reconnect)"
758 );
759 }
760 // Extract payload and forward
761 if tx.send(msg.payload).is_err() {
762 break; // Receiver dropped
763 }
764 }
765 });
766
767 Ok(Receiver::from(rx))
768 }
769
770 /// Publish a message to a specific topic
771 ///
772 /// # Arguments
773 ///
774 /// * `topic` - Type-safe topic to publish to
775 /// * `message` - Message to publish (type checked against topic)
776 ///
777 /// # Example
778 ///
779 /// ```rust
780 /// use mecha10::prelude::*;
781 /// use mecha10::topics::sensor;
782 /// use mecha10::messages::Image;
783 ///
784 /// # async fn example(ctx: &Context) -> Result<()> {
785 /// let image = Image {
786 /// timestamp: now_micros(),
787 /// width: 640,
788 /// height: 480,
789 /// encoding: "rgb8".to_string(),
790 /// data: vec![],
791 /// };
792 ///
793 /// // Type checked: image must be of type Image
794 /// ctx.publish_to(sensor::CAMERA_RGB, &image).await?;
795 /// # Ok(())
796 /// # }
797 /// ```
798 #[cfg(feature = "messaging")]
799 pub async fn publish_to<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
800 where
801 T: Message + Serialize,
802 {
803 let mut bus = self.message_bus.write().await;
804
805 bus.publish(topic.path(), message)
806 .await
807 .with_context(|| {
808 format!(
809 "Failed to publish to topic '{}' from node '{}'",
810 topic.path(),
811 self.node_id
812 )
813 })
814 .map_err(|e| Mecha10Error::MessagingError {
815 message: format!("{:#}", e),
816 suggestion: "Check that Redis is running and accepting connections".to_string(),
817 })?;
818
819 Ok(())
820 }
821
822 /// Publish a message to a dynamic topic path (for runtime-generated topics like camera streams)
823 ///
824 /// This is useful when the topic path is determined at runtime and cannot be a `&'static str`.
825 ///
826 /// # Arguments
827 /// * `topic_path` - The topic path as a String (e.g., "robot/sensors/camera/front/compressed")
828 /// * `message` - The message to publish
829 pub async fn publish_to_path<T>(&self, topic_path: &str, message: &T) -> Result<()>
830 where
831 T: Message + Serialize,
832 {
833 let mut bus = self.message_bus.write().await;
834
835 bus.publish(topic_path, message)
836 .await
837 .with_context(|| {
838 format!(
839 "Failed to publish to topic '{}' from node '{}'",
840 topic_path, self.node_id
841 )
842 })
843 .map_err(|e| Mecha10Error::MessagingError {
844 message: format!("{:#}", e),
845 suggestion: "Check that Redis is running and accepting connections".to_string(),
846 })?;
847
848 Ok(())
849 }
850
851 /// Publish a message to an instance-scoped topic
852 ///
853 /// This automatically appends the context's instance name to the topic path.
854 /// If no instance is set, this behaves like `publish_to()`.
855 ///
856 /// # Arguments
857 ///
858 /// * `topic` - Type-safe topic to publish to
859 /// * `message` - Message to publish (type checked against topic)
860 ///
861 /// # Example
862 ///
863 /// ```rust
864 /// use mecha10::prelude::*;
865 /// use mecha10::topics::sensor;
866 /// use mecha10::messages::Image;
867 ///
868 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
869 /// let ctx = Context::new("camera_fake").await?.with_instance("left");
870 ///
871 /// let image = Image {
872 /// timestamp: now_micros(),
873 /// width: 640,
874 /// height: 480,
875 /// encoding: "rgb8".to_string(),
876 /// data: vec![],
877 /// };
878 ///
879 /// // Publishes to "/sensor/camera/rgb/left"
880 /// ctx.publish_to_scoped(sensor::CAMERA_RGB, &image).await?;
881 /// # Ok(())
882 /// # }
883 /// ```
884 #[cfg(feature = "messaging")]
885 pub async fn publish_to_scoped<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
886 where
887 T: Message + Serialize,
888 {
889 let scoped_path = if let Some(instance) = &self.instance {
890 format!("{}/{}", topic.path(), instance)
891 } else {
892 topic.path().to_string()
893 };
894
895 self.publish_raw(&scoped_path, message).await
896 }
897
898 /// Publish a message to a topic with an explicit instance suffix
899 ///
900 /// This allows publishing to a specific instance's topic regardless of
901 /// the context's own instance name.
902 ///
903 /// # Arguments
904 ///
905 /// * `topic` - Type-safe topic to publish to
906 /// * `instance` - Instance name to append to topic path
907 /// * `message` - Message to publish (type checked against topic)
908 ///
909 /// # Example
910 ///
911 /// ```rust
912 /// use mecha10::prelude::*;
913 /// use mecha10::topics::actuator;
914 ///
915 /// # async fn example(ctx: &Context) -> Result<()> {
916 /// let cmd = MotorCommand { velocity: 1.0, torque: 0.0 };
917 ///
918 /// // Send to specific motor instances
919 /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_left", &cmd).await?;
920 /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_right", &cmd).await?;
921 /// # Ok(())
922 /// # }
923 /// ```
924 #[cfg(feature = "messaging")]
925 pub async fn publish_to_instance<T>(&self, topic: Topic<T>, instance: &str, message: &T) -> Result<()>
926 where
927 T: Message + Serialize,
928 {
929 let instance_path = format!("{}/{}", topic.path(), instance);
930 self.publish_raw(&instance_path, message).await
931 }
932
933 /// Subscribe to an instance-scoped topic
934 ///
935 /// This automatically appends the context's instance name to the topic path.
936 /// If no instance is set, this behaves like `subscribe()`.
937 ///
938 /// # Example
939 ///
940 /// ```rust
941 /// use mecha10::prelude::*;
942 /// use mecha10::topics::sensor;
943 ///
944 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
945 /// let ctx = Context::new("vision_node").await?;
946 ///
947 /// // Subscribe to left camera (topic: "/sensor/camera/rgb/left")
948 /// let mut left_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "left").await?;
949 ///
950 /// // Subscribe to right camera (topic: "/sensor/camera/rgb/right")
951 /// let mut right_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "right").await?;
952 /// # Ok(())
953 /// # }
954 /// ```
955 #[cfg(feature = "messaging")]
956 pub async fn subscribe_scoped<T>(&self, topic: Topic<T>, instance: &str) -> Result<Receiver<T>>
957 where
958 T: Message + DeserializeOwned + Send + 'static,
959 {
960 let scoped_path = format!("{}/{}", topic.path(), instance);
961 self.subscribe_raw(&scoped_path).await
962 }
963
964 /// Subscribe to a topic using a raw string path (for RPC and advanced use cases)
965 ///
966 /// # Arguments
967 ///
968 /// * `topic_path` - Raw topic path string
969 ///
970 /// # Returns
971 ///
972 /// A receiver that will yield messages of type `T`
973 ///
974 /// # Note
975 ///
976 /// This is a low-level method. Prefer using `subscribe()` with type-safe topics
977 /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
978 #[cfg(feature = "messaging")]
979 pub async fn subscribe_raw<T>(&self, topic_path: &str) -> Result<Receiver<T>>
980 where
981 T: Message + DeserializeOwned + Send + 'static,
982 {
983 let mut bus = self.message_bus.write().await;
984
985 // Create a unique consumer group for this node
986 let consumer_group = format!("{}-{}", self.node_id, topic_path.replace('/', "-"));
987
988 let subscriber = bus
989 .subscribe::<T>(topic_path, &consumer_group)
990 .await
991 .with_context(|| {
992 format!(
993 "Failed to subscribe to raw topic '{}' with consumer group '{}' from node '{}'",
994 topic_path, consumer_group, self.node_id
995 )
996 })
997 .map_err(|e| Mecha10Error::MessagingError {
998 message: format!("{:#}", e),
999 suggestion: format!("Check that Redis is running and topic '{}' is valid", topic_path),
1000 })?;
1001
1002 // Convert to our Receiver type
1003 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1004
1005 // Spawn task to forward messages
1006 tokio::spawn(async move {
1007 let mut sub = subscriber;
1008 while let Some(msg) = sub.recv().await {
1009 // Auto-acknowledge message before moving payload
1010 if let Err(e) = msg.ack().await {
1011 tracing::warn!(
1012 topic = %msg.topic,
1013 message_id = %msg.id,
1014 error = %e,
1015 "Failed to acknowledge message (will be redelivered on reconnect)"
1016 );
1017 }
1018 // Extract payload and forward
1019 if tx.send(msg.payload).is_err() {
1020 break; // Receiver dropped
1021 }
1022 }
1023 });
1024
1025 Ok(Receiver::from(rx))
1026 }
1027
1028 /// Discover topics matching a glob pattern
1029 ///
1030 /// Queries Redis for all topics matching the given pattern without subscribing.
1031 /// Useful for inspecting available topics before subscribing.
1032 ///
1033 /// # Pattern Syntax
1034 ///
1035 /// - `*` matches any sequence of characters within a path segment
1036 /// - `**` matches any sequence of path segments
1037 /// - `{a,b}` matches either a or b (future enhancement)
1038 ///
1039 /// # Arguments
1040 ///
1041 /// * `pattern` - Glob pattern to match topics against
1042 ///
1043 /// # Returns
1044 ///
1045 /// A vector of topic paths that match the pattern
1046 ///
1047 /// # Example
1048 ///
1049 /// ```rust,no_run
1050 /// use mecha10::prelude::*;
1051 ///
1052 /// # async fn example(ctx: &Context) -> Result<()> {
1053 /// // Discover all camera topics
1054 /// let topics = ctx.discover_topics("/sensor/camera/*").await?;
1055 /// println!("Found {} camera topics: {:?}", topics.len(), topics);
1056 /// # Ok(())
1057 /// # }
1058 /// ```
1059 #[cfg(feature = "messaging")]
1060 pub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>> {
1061 use crate::aggregated::pattern::convert_glob_to_redis;
1062
1063 let bus = self.message_bus.read().await;
1064 let redis_pattern = convert_glob_to_redis(pattern);
1065
1066 bus.discover_topics(&redis_pattern)
1067 .await
1068 .with_context(|| {
1069 format!(
1070 "Failed to discover topics matching pattern '{}' (redis pattern: '{}') from node '{}'",
1071 pattern, redis_pattern, self.node_id
1072 )
1073 })
1074 .map_err(|e| Mecha10Error::MessagingError {
1075 message: format!("{:#}", e),
1076 suggestion: "Check that Redis is running and the pattern is valid".to_string(),
1077 })
1078 }
1079
1080 /// Subscribe to multiple topics matching a glob pattern
1081 ///
1082 /// Returns an aggregated receiver that multiplexes all matching topics into a single stream.
1083 /// Each message is tagged with its source topic path.
1084 ///
1085 /// # Pattern Syntax
1086 ///
1087 /// - `*` matches any sequence of characters within a path segment
1088 /// - `**` matches any sequence of path segments
1089 /// - `{a,b}` matches either a or b (future enhancement)
1090 ///
1091 /// # Arguments
1092 ///
1093 /// * `pattern` - Glob pattern to match topics against
1094 ///
1095 /// # Returns
1096 ///
1097 /// An `AggregatedReceiver<T>` that yields `(topic_path, message)` tuples
1098 ///
1099 /// # Examples
1100 ///
1101 /// ```rust,no_run
1102 /// use mecha10::prelude::*;
1103 ///
1104 /// # async fn example(ctx: &Context) -> Result<()> {
1105 /// // Subscribe to all cameras with one call
1106 /// let mut cameras = ctx.subscribe_aggregated::<ImageMessage>("/sensor/camera/*").await?;
1107 ///
1108 /// while let Some((topic, image)) = cameras.recv().await {
1109 /// let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
1110 /// println!("Camera {}: {}x{}", camera_name, image.width, image.height);
1111 /// }
1112 /// # Ok(())
1113 /// # }
1114 /// ```
1115 #[cfg(feature = "messaging")]
1116 pub async fn subscribe_aggregated<T>(&self, pattern: &str) -> Result<crate::aggregated::AggregatedReceiver<T>>
1117 where
1118 T: Message + DeserializeOwned + Send + 'static,
1119 {
1120 // 1. Discover all topics matching pattern
1121 let matching_topics = self.discover_topics(pattern).await?;
1122
1123 if matching_topics.is_empty() {
1124 return Err(Mecha10Error::MessagingError {
1125 message: format!("No topics found matching pattern: {}", pattern),
1126 suggestion: "Check that the pattern is correct and topics exist".to_string(),
1127 });
1128 }
1129
1130 // 2. Subscribe to each topic
1131 let mut receivers = Vec::new();
1132 for topic_path in matching_topics {
1133 let rx = self.subscribe_raw::<T>(&topic_path).await?;
1134 // Extract receiver and wrap in ReceiverType enum
1135 let receiver_type = match rx.inner {
1136 crate::context::receiver::ReceiverInner::Unbounded(unbounded) => {
1137 crate::aggregated::ReceiverType::Unbounded(unbounded)
1138 }
1139 crate::context::receiver::ReceiverInner::Bounded(bounded) => {
1140 crate::aggregated::ReceiverType::Bounded(bounded)
1141 }
1142 };
1143 receivers.push((topic_path.clone(), receiver_type));
1144 }
1145
1146 // 3. Return aggregated receiver with support for both channel types
1147 Ok(crate::aggregated::AggregatedReceiver::new_mixed(
1148 pattern.to_string(),
1149 receivers,
1150 ))
1151 }
1152
1153 /// Publish a message to a topic using a raw string path (for RPC and advanced use cases)
1154 ///
1155 /// # Arguments
1156 ///
1157 /// * `topic_path` - Raw topic path string
1158 /// * `message` - Message to publish
1159 ///
1160 /// # Note
1161 ///
1162 /// This is a low-level method. Prefer using `publish_to()` with type-safe topics
1163 /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
1164 #[cfg(feature = "messaging")]
1165 pub async fn publish_raw<T>(&self, topic_path: &str, message: &T) -> Result<()>
1166 where
1167 T: Message + Serialize,
1168 {
1169 let mut bus = self.message_bus.write().await;
1170
1171 bus.publish(topic_path, message)
1172 .await
1173 .with_context(|| {
1174 format!(
1175 "Failed to publish to raw topic '{}' from node '{}'",
1176 topic_path, self.node_id
1177 )
1178 })
1179 .map_err(|e| Mecha10Error::MessagingError {
1180 message: format!("{:#}", e),
1181 suggestion: "Check that Redis is running and accepting connections".to_string(),
1182 })?;
1183
1184 Ok(())
1185 }
1186
1187 /// Check if shutdown has been requested
1188 pub async fn is_shutdown(&self) -> bool {
1189 *self.shutdown.read().await
1190 }
1191
1192 /// Request shutdown
1193 pub async fn shutdown(&self) {
1194 *self.shutdown.write().await = true;
1195
1196 // Stop Redis bridge if running
1197 #[cfg(feature = "messaging")]
1198 {
1199 let bridge_opt = self.redis_bridge.read().await;
1200 if let Some(bridge) = bridge_opt.as_ref() {
1201 if let Err(e) = bridge.stop().await {
1202 tracing::warn!("Failed to stop Redis bridge during shutdown: {}", e);
1203 }
1204 }
1205 }
1206 }
1207
1208 /// Wait for shutdown signal
1209 pub async fn wait_for_shutdown(&self) {
1210 loop {
1211 if self.is_shutdown().await {
1212 break;
1213 }
1214 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1215 }
1216 }
1217
1218 /// Enter a logging span with this node's context
1219 ///
1220 /// This returns a guard that, when entered, adds the node_id to all log messages.
1221 /// The span is automatically exited when the guard is dropped.
1222 ///
1223 /// # Example
1224 ///
1225 /// ```rust
1226 /// use mecha10::prelude::*;
1227 ///
1228 /// # async fn example(ctx: &Context) -> Result<()> {
1229 /// let _guard = ctx.logging_span().entered();
1230 /// info!("This message includes node_id automatically");
1231 /// debug!("So does this one");
1232 /// # Ok(())
1233 /// # }
1234 /// ```
1235 pub fn logging_span(&self) -> tracing::Span {
1236 tracing::info_span!("node", node_id = %self.node_id)
1237 }
1238
1239 /// Subscribe to all camera topics
1240 ///
1241 /// Convenience method for `subscribe_aggregated::<Image>("/sensor/camera/*")`
1242 ///
1243 /// # Example
1244 ///
1245 /// ```rust,no_run
1246 /// use mecha10::prelude::*;
1247 ///
1248 /// # async fn example(ctx: &Context) -> Result<()> {
1249 /// let mut cameras = ctx.subscribe_all_cameras().await?;
1250 /// while let Some((topic, image)) = cameras.recv().await {
1251 /// let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
1252 /// println!("Camera {}: {}x{}", camera_name, image.width, image.height);
1253 /// }
1254 /// # Ok(())
1255 /// # }
1256 /// ```
1257 #[cfg(feature = "messaging")]
1258 pub async fn subscribe_all_cameras(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::Image>> {
1259 self.subscribe_aggregated("/sensor/camera/*").await
1260 }
1261
1262 /// Subscribe to all sensor topics
1263 ///
1264 /// Subscribes to all sensor data including cameras, LiDAR, IMU, odometry, etc.
1265 ///
1266 /// # Example
1267 ///
1268 /// ```rust,no_run
1269 /// use mecha10::prelude::*;
1270 ///
1271 /// # async fn example(ctx: &Context) -> Result<()> {
1272 /// // Subscribe to all sensors (returns untyped messages)
1273 /// let topics = ctx.discover_topics("/sensor/**").await?;
1274 /// println!("Found {} sensor topics", topics.len());
1275 /// # Ok(())
1276 /// # }
1277 /// ```
1278 #[cfg(feature = "messaging")]
1279 pub async fn discover_all_sensors(&self) -> Result<Vec<String>> {
1280 self.discover_topics("/sensor/**").await
1281 }
1282
1283 /// Subscribe to all LiDAR topics
1284 ///
1285 /// Convenience method for `subscribe_aggregated::<LaserScan>("/sensor/lidar/*")`
1286 #[cfg(feature = "messaging")]
1287 pub async fn subscribe_all_lidars(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::LaserScan>> {
1288 self.subscribe_aggregated("/sensor/lidar/*").await
1289 }
1290
1291 /// Subscribe to all IMU topics
1292 ///
1293 /// Convenience method for `subscribe_aggregated::<IMU>("/sensor/imu/*")`
1294 #[cfg(feature = "messaging")]
1295 pub async fn subscribe_all_imus(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::IMU>> {
1296 self.subscribe_aggregated("/sensor/imu/*").await
1297 }
1298
1299 /// Subscribe to all motor topics
1300 ///
1301 /// Convenience method for `subscribe_aggregated::<MotorStatus>("/actuator/motor/*")`
1302 #[cfg(feature = "messaging")]
1303 pub async fn subscribe_all_motors(
1304 &self,
1305 ) -> Result<crate::aggregated::AggregatedReceiver<crate::actuator::MotorStatus>> {
1306 self.subscribe_aggregated("/actuator/motor/*").await
1307 }
1308
1309 /// Subscribe to a topic pattern with wildcard
1310 ///
1311 /// Generic helper for subscribing to multiple topics matching a pattern.
1312 ///
1313 /// # Example
1314 ///
1315 /// ```rust,no_run
1316 /// use mecha10::prelude::*;
1317 /// use mecha10::topics::sensor;
1318 ///
1319 /// # async fn example(ctx: &Context) -> Result<()> {
1320 /// // Subscribe to all instances of a specific topic type
1321 /// let mut cameras = ctx.subscribe_pattern(sensor::CAMERA_RGB, "*").await?;
1322 /// # Ok(())
1323 /// # }
1324 /// ```
1325 #[cfg(feature = "messaging")]
1326 pub async fn subscribe_pattern<T>(
1327 &self,
1328 topic: crate::topics::Topic<T>,
1329 pattern: &str,
1330 ) -> Result<crate::aggregated::AggregatedReceiver<T>>
1331 where
1332 T: Message + DeserializeOwned + Send + 'static,
1333 {
1334 let full_pattern = format!("{}/{}", topic.path(), pattern);
1335 self.subscribe_aggregated(&full_pattern).await
1336 }
1337
1338 /// Get or create a state manager for this node
1339 ///
1340 /// The state manager provides persistent storage for node state across restarts.
1341 /// By default, it uses filesystem storage in `./state/` directory.
1342 ///
1343 /// # Example
1344 ///
1345 /// ```rust,no_run
1346 /// use mecha10::prelude::*;
1347 /// use serde::{Deserialize, Serialize};
1348 ///
1349 /// #[derive(Debug, Clone, Serialize, Deserialize)]
1350 /// struct MyState {
1351 /// counter: u64,
1352 /// position: (f64, f64),
1353 /// }
1354 ///
1355 /// # async fn example(ctx: &Context) -> Result<()> {
1356 /// let state_mgr = ctx.state_manager().await?;
1357 ///
1358 /// // Load previous state
1359 /// let mut state = state_mgr
1360 /// .load::<MyState>("my_node")
1361 /// .await?
1362 /// .unwrap_or(MyState {
1363 /// counter: 0,
1364 /// position: (0.0, 0.0),
1365 /// });
1366 ///
1367 /// // Update state
1368 /// state.counter += 1;
1369 ///
1370 /// // Save state
1371 /// state_mgr.save("my_node", &state).await?;
1372 /// # Ok(())
1373 /// # }
1374 /// ```
1375 pub async fn state_manager(&self) -> Result<Arc<crate::state::ConcreteStateManager>> {
1376 let mut mgr = self.state_manager.write().await;
1377
1378 if let Some(existing) = mgr.as_ref() {
1379 return Ok(Arc::clone(existing));
1380 }
1381
1382 // Create default filesystem-based state manager
1383 let state_dir = std::env::var("MECHA10_STATE_DIR").unwrap_or_else(|_| "./state".to_string());
1384 let fs_mgr = FilesystemStateManager::new(&state_dir).await?;
1385 let arc_mgr = Arc::new(crate::state::ConcreteStateManager::Filesystem(fs_mgr));
1386
1387 *mgr = Some(Arc::clone(&arc_mgr));
1388
1389 Ok(arc_mgr)
1390 }
1391
1392 /// Set a custom state manager for this context
1393 ///
1394 /// This allows using alternative backends (Redis, PostgreSQL, etc.) or custom implementations.
1395 ///
1396 /// # Example
1397 ///
1398 /// ```rust,no_run
1399 /// use mecha10::prelude::*;
1400 /// use std::sync::Arc;
1401 ///
1402 /// # async fn example(ctx: &Context) -> Result<()> {
1403 /// // Use in-memory state manager for testing
1404 /// let mem_mgr = Arc::new(MemoryStateManager::new());
1405 /// ctx.set_state_manager(mem_mgr).await;
1406 /// # Ok(())
1407 /// # }
1408 /// ```
1409 pub async fn set_state_manager(&self, manager: Arc<crate::state::ConcreteStateManager>) {
1410 let mut mgr = self.state_manager.write().await;
1411 *mgr = Some(manager);
1412 }
1413
1414 // ========================================
1415 // Topic Discovery Helpers
1416 // ========================================
1417
1418 /// List all sensor topics currently active in the system
1419 ///
1420 /// Returns all topics under `/sensor/*` including cameras, LiDAR, IMU, GPS, etc.
1421 ///
1422 /// # Example
1423 ///
1424 /// ```rust,no_run
1425 /// use mecha10::prelude::*;
1426 ///
1427 /// # async fn example(ctx: &Context) -> Result<()> {
1428 /// let sensors = ctx.list_sensor_topics().await?;
1429 /// println!("Available sensors:");
1430 /// for topic in sensors {
1431 /// println!(" - {}", topic);
1432 /// }
1433 /// # Ok(())
1434 /// # }
1435 /// ```
1436 #[cfg(feature = "messaging")]
1437 pub async fn list_sensor_topics(&self) -> Result<Vec<String>> {
1438 self.discover_topics("/sensor/*").await
1439 }
1440
1441 /// List all actuator topics currently active in the system
1442 ///
1443 /// Returns all topics under `/actuator/*` including motors, servos, grippers, etc.
1444 ///
1445 /// # Example
1446 ///
1447 /// ```rust,no_run
1448 /// use mecha10::prelude::*;
1449 ///
1450 /// # async fn example(ctx: &Context) -> Result<()> {
1451 /// let actuators = ctx.list_actuator_topics().await?;
1452 /// println!("Available actuators:");
1453 /// for topic in actuators {
1454 /// println!(" - {}", topic);
1455 /// }
1456 /// # Ok(())
1457 /// # }
1458 /// ```
1459 #[cfg(feature = "messaging")]
1460 pub async fn list_actuator_topics(&self) -> Result<Vec<String>> {
1461 self.discover_topics("/actuator/*").await
1462 }
1463
1464 /// List all topics in the system
1465 ///
1466 /// Returns all active topics regardless of category.
1467 ///
1468 /// # Example
1469 ///
1470 /// ```rust,no_run
1471 /// use mecha10::prelude::*;
1472 ///
1473 /// # async fn example(ctx: &Context) -> Result<()> {
1474 /// let all_topics = ctx.list_all_topics().await?;
1475 /// println!("All active topics ({} total):", all_topics.len());
1476 /// for topic in all_topics {
1477 /// println!(" - {}", topic);
1478 /// }
1479 /// # Ok(())
1480 /// # }
1481 /// ```
1482 #[cfg(feature = "messaging")]
1483 pub async fn list_all_topics(&self) -> Result<Vec<String>> {
1484 self.discover_topics("*").await
1485 }
1486
1487 /// List all camera topics
1488 ///
1489 /// Returns all topics under `/sensor/camera/*`.
1490 ///
1491 /// # Example
1492 ///
1493 /// ```rust,no_run
1494 /// use mecha10::prelude::*;
1495 ///
1496 /// # async fn example(ctx: &Context) -> Result<()> {
1497 /// let cameras = ctx.list_camera_topics().await?;
1498 /// println!("Available cameras:");
1499 /// for topic in cameras {
1500 /// println!(" - {}", topic);
1501 /// }
1502 /// # Ok(())
1503 /// # }
1504 /// ```
1505 #[cfg(feature = "messaging")]
1506 pub async fn list_camera_topics(&self) -> Result<Vec<String>> {
1507 self.discover_topics("/sensor/camera/*").await
1508 }
1509
1510 /// List all LiDAR topics
1511 ///
1512 /// Returns all topics under `/sensor/lidar/*`.
1513 #[cfg(feature = "messaging")]
1514 pub async fn list_lidar_topics(&self) -> Result<Vec<String>> {
1515 self.discover_topics("/sensor/lidar/*").await
1516 }
1517
1518 /// List all IMU topics
1519 ///
1520 /// Returns all topics under `/sensor/imu/*`.
1521 #[cfg(feature = "messaging")]
1522 pub async fn list_imu_topics(&self) -> Result<Vec<String>> {
1523 self.discover_topics("/sensor/imu/*").await
1524 }
1525
1526 /// List all motor topics
1527 ///
1528 /// Returns all topics under `/actuator/motor/*`.
1529 #[cfg(feature = "messaging")]
1530 pub async fn list_motor_topics(&self) -> Result<Vec<String>> {
1531 self.discover_topics("/actuator/motor/*").await
1532 }
1533
1534 /// Print a formatted summary of all active topics
1535 ///
1536 /// Useful for debugging and introspection. Groups topics by category
1537 /// and displays them in a human-readable format.
1538 ///
1539 /// # Example
1540 ///
1541 /// ```rust,no_run
1542 /// use mecha10::prelude::*;
1543 ///
1544 /// # async fn example(ctx: &Context) -> Result<()> {
1545 /// ctx.print_topic_summary().await?;
1546 /// // Output:
1547 /// // === Active Topics ===
1548 /// // Sensors (3):
1549 /// // - /sensor/camera/front
1550 /// // - /sensor/lidar/main
1551 /// // - /sensor/imu/base
1552 /// // Actuators (2):
1553 /// // - /actuator/motor/left
1554 /// // - /actuator/motor/right
1555 /// // =====================
1556 /// # Ok(())
1557 /// # }
1558 /// ```
1559 #[cfg(feature = "messaging")]
1560 pub async fn print_topic_summary(&self) -> Result<()> {
1561 let sensors = self.list_sensor_topics().await.unwrap_or_default();
1562 let actuators = self.list_actuator_topics().await.unwrap_or_default();
1563 let all_topics = self.list_all_topics().await?;
1564
1565 let other_topics: Vec<String> = all_topics
1566 .into_iter()
1567 .filter(|t| !t.starts_with("/sensor/") && !t.starts_with("/actuator/"))
1568 .collect();
1569
1570 println!("=== Active Topics ===");
1571
1572 if !sensors.is_empty() {
1573 println!("Sensors ({}):", sensors.len());
1574 for topic in sensors {
1575 println!(" - {}", topic);
1576 }
1577 }
1578
1579 if !actuators.is_empty() {
1580 println!("Actuators ({}):", actuators.len());
1581 for topic in actuators {
1582 println!(" - {}", topic);
1583 }
1584 }
1585
1586 if !other_topics.is_empty() {
1587 println!("Other ({}):", other_topics.len());
1588 for topic in other_topics {
1589 println!(" - {}", topic);
1590 }
1591 }
1592
1593 println!("=====================");
1594
1595 Ok(())
1596 }
1597}