mecha10_core/context/mod.rs
1//! Node execution context
2//!
3//! Provides a high-level API for nodes to interact with the framework,
4//! including type-safe pub/sub, configuration access, and lifecycle management.
5//!
6//! This module has been refactored into smaller sub-modules for better organization:
7//! - `infra_config`: Infrastructure configuration types
8//! - `receiver`: Message receiver wrapper
9
10mod builder;
11mod infra_config;
12mod receiver;
13
14pub use builder::ContextBuilder;
15pub use infra_config::{BridgeConfig, InfrastructureConfig, MinioConfig, MlflowConfig, PostgresConfig, RedisConfig};
16pub use receiver::Receiver;
17
18use crate::error::{Mecha10Error, Result};
19use crate::messages::Message;
20use crate::state::FilesystemStateManager;
21use crate::topics::Topic;
22use anyhow::Context as _;
23use serde::{de::DeserializeOwned, Serialize};
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27// Import the complete Context struct and all implementations from the backup file
28// This maintains backward compatibility while improving code organization
29
30pub struct Context {
31 /// Node identifier (includes instance if set: "node_name" or "node_name/instance")
32 node_id: String,
33
34 /// Optional instance name
35 instance: Option<String>,
36
37 /// Local message bus for on-robot communication (wrapped in Arc for sharing across tasks)
38 #[cfg(feature = "messaging")]
39 message_bus: Arc<RwLock<mecha10_messaging::MessageBus>>,
40
41 /// Control plane message bus for robot-to-cloud communication (optional)
42 #[cfg(feature = "messaging")]
43 control_plane_bus: Arc<RwLock<Option<mecha10_messaging::MessageBus>>>,
44
45 /// Redis bridge for forwarding messages between local and control plane
46 #[cfg(feature = "messaging")]
47 redis_bridge: Arc<RwLock<Option<crate::redis_bridge::RedisBridge>>>,
48
49 /// Shutdown signal
50 shutdown: Arc<RwLock<bool>>,
51
52 /// State manager for persistent node state (lazy initialized)
53 state_manager: Arc<RwLock<Option<Arc<crate::state::ConcreteStateManager>>>>,
54}
55impl Clone for Context {
56 fn clone(&self) -> Self {
57 Self {
58 node_id: self.node_id.clone(),
59 instance: self.instance.clone(),
60 #[cfg(feature = "messaging")]
61 message_bus: Arc::clone(&self.message_bus),
62 #[cfg(feature = "messaging")]
63 control_plane_bus: Arc::clone(&self.control_plane_bus),
64 #[cfg(feature = "messaging")]
65 redis_bridge: Arc::clone(&self.redis_bridge),
66 shutdown: Arc::clone(&self.shutdown),
67 state_manager: Arc::clone(&self.state_manager),
68 }
69 }
70}
71impl Context {
72 /// Create a new context
73 ///
74 /// # Arguments
75 ///
76 /// * `node_id` - Unique identifier for this node
77 ///
78 /// # Example
79 ///
80 /// ```rust
81 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
82 /// let ctx = Context::new("my-node").await?;
83 /// # Ok(())
84 /// # }
85 /// ```
86 pub async fn new(node_id: &str) -> Result<Self> {
87 #[cfg(feature = "messaging")]
88 let (message_bus, control_plane_bus, redis_bridge) = {
89 // Load infrastructure config
90 let config = Self::load_infrastructure_config()?;
91
92 // Connect to local Redis
93 let redis_url = Self::get_redis_url_from_config(&config)?;
94 let local_bus = mecha10_messaging::MessageBus::connect(&redis_url, node_id)
95 .await
96 .with_context(|| {
97 format!(
98 "Failed to connect local message bus for node '{}' to Redis at '{}'",
99 node_id, redis_url
100 )
101 })
102 .map_err(|e| Mecha10Error::MessagingError {
103 message: format!("{:#}", e),
104 suggestion: "Ensure local Redis is running at the configured URL".to_string(),
105 })?;
106
107 // Connect to control plane Redis if configured
108 let cp_bus = if let Some(cp_config) = &config.control_plane_redis {
109 let cp_url = &cp_config.url;
110 match mecha10_messaging::MessageBus::connect(cp_url, node_id).await {
111 Ok(bus) => {
112 tracing::info!(
113 "Connected to control plane Redis at '{}' for node '{}'",
114 cp_url,
115 node_id
116 );
117 Some(bus)
118 }
119 Err(e) => {
120 tracing::warn!(
121 "Failed to connect to control plane Redis at '{}': {}. Continuing with local Redis only.",
122 cp_url,
123 e
124 );
125 None
126 }
127 }
128 } else {
129 None
130 };
131
132 // Initialize Redis bridge if both connections are available
133 let bridge = if let Some(_cp_bus_ref) = &cp_bus {
134 // Get bridge configuration (use default if not specified)
135 let bridge_config = config.bridge.unwrap_or_default();
136
137 if bridge_config.enabled {
138 // Create bridge with separate clones of the buses
139 let bridge_local_bus = mecha10_messaging::MessageBus::connect(&redis_url, node_id)
140 .await
141 .map_err(|e| Mecha10Error::MessagingError {
142 message: format!("Failed to create bridge connection to local Redis: {}", e),
143 suggestion: "Check local Redis connection".to_string(),
144 })?;
145
146 let cp_url = config
147 .control_plane_redis
148 .as_ref()
149 .map(|c| c.url.clone())
150 .unwrap_or_default();
151 let bridge_cp_bus =
152 mecha10_messaging::MessageBus::connect(&cp_url, node_id)
153 .await
154 .map_err(|e| Mecha10Error::MessagingError {
155 message: format!("Failed to create bridge connection to control plane Redis: {}", e),
156 suggestion: "Check control plane Redis connection".to_string(),
157 })?;
158
159 // Convert from infra_config::BridgeConfig to redis_bridge::BridgeConfig
160 let redis_bridge_config = crate::redis_bridge::BridgeConfig {
161 enabled: bridge_config.enabled,
162 local_topics: bridge_config.local_topics,
163 bridged_topics: bridge_config.bridged_topics,
164 robot_id: bridge_config.robot_id,
165 };
166
167 let bridge =
168 crate::redis_bridge::RedisBridge::new(redis_bridge_config, bridge_local_bus, bridge_cp_bus);
169
170 // Start the bridge
171 if let Err(e) = bridge.start().await {
172 tracing::warn!("Failed to start Redis bridge: {}. Continuing without bridge.", e);
173 None
174 } else {
175 tracing::info!("Redis bridge started successfully");
176 Some(bridge)
177 }
178 } else {
179 tracing::debug!("Redis bridge is disabled in configuration");
180 None
181 }
182 } else {
183 None
184 };
185
186 (local_bus, cp_bus, bridge)
187 };
188
189 Ok(Self {
190 node_id: node_id.to_string(),
191 instance: None,
192 #[cfg(feature = "messaging")]
193 message_bus: Arc::new(RwLock::new(message_bus)),
194 #[cfg(feature = "messaging")]
195 control_plane_bus: Arc::new(RwLock::new(control_plane_bus)),
196 #[cfg(feature = "messaging")]
197 redis_bridge: Arc::new(RwLock::new(redis_bridge)),
198 shutdown: Arc::new(RwLock::new(false)),
199 state_manager: Arc::new(RwLock::new(None)),
200 })
201 }
202
203 /// Set an instance name for this context (builder pattern)
204 ///
205 /// This allows the same node implementation to run multiple instances
206 /// with different names. The node_id becomes "{node_name}/{instance}".
207 ///
208 /// # Example
209 ///
210 /// ```rust
211 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
212 /// // Single instance (no instance name)
213 /// let ctx = Context::new("camera_fake").await?;
214 /// // node_id: "camera_fake"
215 ///
216 /// // Multi-instance (with instance names)
217 /// let left_ctx = Context::new("camera_fake").await?.with_instance("left");
218 /// // node_id: "camera_fake/left"
219 ///
220 /// let right_ctx = Context::new("camera_fake").await?.with_instance("right");
221 /// // node_id: "camera_fake/right"
222 /// # Ok(())
223 /// # }
224 /// ```
225 pub fn with_instance(mut self, instance: &str) -> Self {
226 self.instance = Some(instance.to_string());
227 self.node_id = format!("{}/{}", self.node_id, instance);
228 self
229 }
230
231 /// Get the instance name if set
232 ///
233 /// Returns `None` for single-instance nodes, or `Some(instance)` for
234 /// multi-instance nodes.
235 pub fn instance(&self) -> Option<&str> {
236 self.instance.as_deref()
237 }
238
239 /// Load infrastructure configuration based on environment
240 ///
241 /// Priority order:
242 /// 1. mecha10.json (project configuration) - redis.url field
243 /// 2. Environment variables (MECHA10_REDIS_URL or REDIS_URL)
244 /// 3. config/infrastructure.<environment>.yaml
245 ///
246 /// Set via MECHA10_ENVIRONMENT environment variable.
247 fn load_infrastructure_config() -> Result<InfrastructureConfig> {
248 use crate::config::load_config;
249 use std::env;
250 use std::path::PathBuf;
251
252 // 1. Try loading from mecha10.json FIRST (project configuration)
253 if let Ok(redis_url) = Self::try_load_redis_from_mecha10_json() {
254 return Ok(InfrastructureConfig {
255 redis: crate::context::infra_config::RedisConfig {
256 url: redis_url,
257 max_connections: 10,
258 connection_timeout_ms: 5000,
259 },
260 control_plane_redis: None,
261 bridge: None,
262 postgres: None,
263 mlflow: None,
264 minio: None,
265 });
266 }
267
268 // 2. Check for environment variable overrides
269 // If REDIS_URL or MECHA10_REDIS_URL is set, use that and create minimal config
270 if let Ok(redis_url) = env::var("MECHA10_REDIS_URL").or_else(|_| env::var("REDIS_URL")) {
271 return Ok(InfrastructureConfig {
272 redis: crate::context::infra_config::RedisConfig {
273 url: redis_url,
274 max_connections: 10,
275 connection_timeout_ms: 5000,
276 },
277 control_plane_redis: None,
278 bridge: None,
279 postgres: None,
280 mlflow: None,
281 minio: None,
282 });
283 }
284
285 // 3. Load from infrastructure.yaml
286 let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
287
288 let config_filename = format!("infrastructure.{}.yaml", environment);
289
290 // Try multiple paths in order:
291 // 1. Current directory (for running from workspace root)
292 // 2. Workspace root (for running tests from target/debug/deps)
293 // 3. Two levels up (for running tests from packages/core/target/debug/deps)
294 let paths_to_try = vec![
295 PathBuf::from(format!("config/{}", config_filename)),
296 PathBuf::from(format!("../../config/{}", config_filename)),
297 PathBuf::from(format!("../../../../config/{}", config_filename)),
298 ];
299
300 for path in &paths_to_try {
301 if path.exists() {
302 return load_config(path)
303 .with_context(|| {
304 format!(
305 "Failed to load infrastructure config for environment '{}' from '{}'",
306 environment,
307 path.display()
308 )
309 })
310 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
311 }
312 }
313
314 // If no config file found, return error with helpful message
315 Err(Mecha10Error::Configuration(format!(
316 "Failed to load infrastructure config for environment '{}'. \
317 Tried paths: {}. \
318 Make sure the file exists, set MECHA10_REDIS_URL environment variable, \
319 or configure redis.url in mecha10.json.",
320 environment,
321 paths_to_try
322 .iter()
323 .map(|p| p.display().to_string())
324 .collect::<Vec<_>>()
325 .join(", ")
326 )))
327 }
328
329 /// Try to load Redis URL from mecha10.json
330 ///
331 /// Looks for mecha10.json in current directory and reads redis.url field.
332 fn try_load_redis_from_mecha10_json() -> Result<String> {
333 use std::path::PathBuf;
334
335 // Try multiple paths to find mecha10.json
336 let paths_to_try = vec![
337 PathBuf::from("mecha10.json"),
338 PathBuf::from("../mecha10.json"),
339 PathBuf::from("../../mecha10.json"),
340 ];
341
342 for path in &paths_to_try {
343 if !path.exists() {
344 continue;
345 }
346
347 // Read and parse mecha10.json
348 let content = std::fs::read_to_string(path)
349 .map_err(|e| Mecha10Error::Configuration(format!("Failed to read {}: {}", path.display(), e)))?;
350
351 let json: serde_json::Value = serde_json::from_str(&content)
352 .map_err(|e| Mecha10Error::Configuration(format!("Failed to parse {}: {}", path.display(), e)))?;
353
354 // Extract redis.url
355 if let Some(redis_url) = json.get("redis").and_then(|r| r.get("url")).and_then(|u| u.as_str()) {
356 return Ok(redis_url.to_string());
357 }
358 }
359
360 Err(Mecha10Error::Configuration(
361 "No mecha10.json found with redis.url configuration".to_string(),
362 ))
363 }
364
365 /// Get Redis URL from infrastructure configuration (helper for internal use)
366 fn get_redis_url_from_config(config: &InfrastructureConfig) -> Result<String> {
367 // First try environment variables (for backward compatibility and testing)
368 if let Ok(url) = std::env::var("MECHA10_REDIS_URL") {
369 return Ok(url);
370 }
371 if let Ok(url) = std::env::var("REDIS_URL") {
372 return Ok(url);
373 }
374
375 // Use config
376 Ok(config.redis.url.clone())
377 }
378
379 /// Get Redis URL from infrastructure configuration
380 pub fn get_redis_url() -> Result<String> {
381 let config = Self::load_infrastructure_config()?;
382 Self::get_redis_url_from_config(&config)
383 }
384
385 /// Get the node ID
386 pub fn node_id(&self) -> &str {
387 &self.node_id
388 }
389
390 /// Load node configuration with environment-aware file-level fallback
391 ///
392 /// This method implements the V2 configuration system with:
393 /// - Environment-specific configs (dev/staging/production)
394 /// - File-level fallback (no key-by-key merging)
395 /// - Automatic path resolution
396 ///
397 /// **Fallback Priority:**
398 /// 1. `configs/{env}/nodes/{node-name}/config.json` (environment-specific)
399 /// 2. `configs/common/nodes/{node-name}/config.json` (common fallback)
400 /// 3. `T::default()` (code default)
401 ///
402 /// **Environment Detection:**
403 /// - Uses `MECHA10_ENVIRONMENT` env var (default: "dev")
404 /// - Supported values: "dev", "staging", "production", or custom
405 ///
406 /// # Type Parameters
407 ///
408 /// * `T` - Configuration type that implements `DeserializeOwned` + `Default`
409 ///
410 /// # Arguments
411 ///
412 /// * `node_name` - Name of the node (e.g., "simulation-bridge", "camera-streamer")
413 ///
414 /// # Returns
415 ///
416 /// The loaded configuration of type `T`, using the first available source
417 /// in the fallback chain.
418 ///
419 /// # Example
420 ///
421 /// ```rust
422 /// use mecha10::prelude::*;
423 /// use serde::{Deserialize, Serialize};
424 ///
425 /// #[derive(Debug, Clone, Deserialize, Serialize, Default)]
426 /// struct SimBridgeConfig {
427 /// camera_width: u32,
428 /// camera_height: u32,
429 /// camera_fps: u32,
430 /// }
431 ///
432 /// # async fn example(ctx: &Context) -> Result<()> {
433 /// // Loads from configs/{env}/nodes/simulation-bridge/config.json
434 /// // Falls back to configs/common/nodes/simulation-bridge/config.json
435 /// // Falls back to SimBridgeConfig::default()
436 /// let config: SimBridgeConfig = ctx.load_node_config("simulation-bridge").await?;
437 /// # Ok(())
438 /// # }
439 /// ```
440 pub async fn load_node_config<T>(&self, node_name: &str) -> Result<T>
441 where
442 T: DeserializeOwned + Default,
443 {
444 use crate::config::load_config;
445 use std::env;
446 use std::path::PathBuf;
447
448 // Get environment from env var (default: "dev")
449 let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "dev".to_string());
450
451 // Try multiple base paths (for running from different locations)
452 let base_paths = vec![
453 PathBuf::from("."), // Current directory
454 PathBuf::from("../.."), // From target/debug/deps
455 PathBuf::from("../../../.."), // From packages/*/target/debug/deps
456 ];
457
458 // Build fallback chain for each base path
459 for base_path in &base_paths {
460 // 1. Try environment-specific config
461 let env_config_path = base_path
462 .join("configs")
463 .join(&environment)
464 .join("nodes")
465 .join(node_name)
466 .join("config.json");
467
468 if env_config_path.exists() {
469 tracing::debug!(
470 "Loading node config for '{}' from environment-specific path: {}",
471 node_name,
472 env_config_path.display()
473 );
474 return load_config(&env_config_path)
475 .with_context(|| {
476 format!(
477 "Failed to load node config for '{}' from '{}'",
478 node_name,
479 env_config_path.display()
480 )
481 })
482 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
483 }
484
485 // 2. Try common config
486 let common_config_path = base_path
487 .join("configs")
488 .join("common")
489 .join("nodes")
490 .join(node_name)
491 .join("config.json");
492
493 if common_config_path.exists() {
494 tracing::debug!(
495 "Loading node config for '{}' from common path: {}",
496 node_name,
497 common_config_path.display()
498 );
499 return load_config(&common_config_path)
500 .with_context(|| {
501 format!(
502 "Failed to load node config for '{}' from '{}'",
503 node_name,
504 common_config_path.display()
505 )
506 })
507 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
508 }
509
510 // 3. Try new flat structure: configs/nodes/@mecha10/{node_name}/config.json
511 let flat_config_path = base_path
512 .join("configs")
513 .join("nodes")
514 .join("@mecha10")
515 .join(node_name)
516 .join("config.json");
517
518 if flat_config_path.exists() {
519 tracing::debug!(
520 "Loading node config for '{}' from flat path: {}",
521 node_name,
522 flat_config_path.display()
523 );
524 return load_config(&flat_config_path)
525 .with_context(|| {
526 format!(
527 "Failed to load node config for '{}' from '{}'",
528 node_name,
529 flat_config_path.display()
530 )
531 })
532 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
533 }
534 }
535
536 // 3. Fall back to default
537 tracing::debug!(
538 "No config file found for node '{}', using Default implementation",
539 node_name
540 );
541 Ok(T::default())
542 }
543
544 /// Check if control plane Redis connection is available
545 #[cfg(feature = "messaging")]
546 pub async fn has_control_plane(&self) -> bool {
547 self.control_plane_bus.read().await.is_some()
548 }
549
550 /// Get control plane Redis URL from infrastructure configuration
551 pub fn get_control_plane_redis_url() -> Result<Option<String>> {
552 let config = Self::load_infrastructure_config()?;
553 Ok(config.control_plane_redis.map(|c| c.url))
554 }
555
556 /// Subscribe to a topic with type-safe message handling
557 ///
558 /// # Arguments
559 ///
560 /// * `topic` - Type-safe topic to subscribe to
561 ///
562 /// # Returns
563 ///
564 /// A receiver that will yield messages of type `T`
565 ///
566 /// # Example
567 ///
568 /// ```rust
569 /// use mecha10::prelude::*;
570 /// use mecha10::topics::sensor;
571 ///
572 /// # async fn example(ctx: &Context) -> Result<()> {
573 /// // Type is inferred as Receiver<Image>
574 /// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
575 ///
576 /// while let Some(image) = images.recv().await {
577 /// println!("Image: {}x{}", image.width, image.height);
578 /// }
579 /// # Ok(())
580 /// # }
581 /// ```
582 #[cfg(feature = "messaging")]
583 pub async fn subscribe<T>(&self, topic: Topic<T>) -> Result<Receiver<T>>
584 where
585 T: Message + DeserializeOwned + Send + 'static,
586 {
587 let mut bus = self.message_bus.write().await;
588
589 // Create a unique consumer group for this node
590 let consumer_group = format!("{}-{}", self.node_id, topic.path().replace('/', "-"));
591
592 let subscriber = bus
593 .subscribe::<T>(topic.path(), &consumer_group)
594 .await
595 .with_context(|| {
596 format!(
597 "Failed to subscribe to topic '{}' with consumer group '{}' from node '{}'",
598 topic.path(),
599 consumer_group,
600 self.node_id
601 )
602 })
603 .map_err(|e| Mecha10Error::MessagingError {
604 message: format!("{:#}", e),
605 suggestion: format!("Check that Redis is running and topic '{}' is valid", topic.path()),
606 })?;
607
608 // Convert to our Receiver type
609 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
610
611 // Spawn task to forward messages
612 tokio::spawn(async move {
613 let mut sub = subscriber;
614 while let Some(msg) = sub.recv().await {
615 // Auto-acknowledge message before moving payload
616 if let Err(e) = msg.ack().await {
617 tracing::warn!(
618 topic = %msg.topic,
619 message_id = %msg.id,
620 error = %e,
621 "Failed to acknowledge message (will be redelivered on reconnect)"
622 );
623 }
624 // Extract payload and forward
625 if tx.send(msg.payload).is_err() {
626 break; // Receiver dropped
627 }
628 }
629 });
630
631 Ok(Receiver::from(rx))
632 }
633
634 /// Publish a message to a specific topic
635 ///
636 /// # Arguments
637 ///
638 /// * `topic` - Type-safe topic to publish to
639 /// * `message` - Message to publish (type checked against topic)
640 ///
641 /// # Example
642 ///
643 /// ```rust
644 /// use mecha10::prelude::*;
645 /// use mecha10::topics::sensor;
646 /// use mecha10::messages::Image;
647 ///
648 /// # async fn example(ctx: &Context) -> Result<()> {
649 /// let image = Image {
650 /// timestamp: now_micros(),
651 /// width: 640,
652 /// height: 480,
653 /// encoding: "rgb8".to_string(),
654 /// data: vec![],
655 /// };
656 ///
657 /// // Type checked: image must be of type Image
658 /// ctx.publish_to(sensor::CAMERA_RGB, &image).await?;
659 /// # Ok(())
660 /// # }
661 /// ```
662 #[cfg(feature = "messaging")]
663 pub async fn publish_to<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
664 where
665 T: Message + Serialize,
666 {
667 let mut bus = self.message_bus.write().await;
668
669 bus.publish(topic.path(), message)
670 .await
671 .with_context(|| {
672 format!(
673 "Failed to publish to topic '{}' from node '{}'",
674 topic.path(),
675 self.node_id
676 )
677 })
678 .map_err(|e| Mecha10Error::MessagingError {
679 message: format!("{:#}", e),
680 suggestion: "Check that Redis is running and accepting connections".to_string(),
681 })?;
682
683 Ok(())
684 }
685
686 /// Publish a message to a dynamic topic path (for runtime-generated topics like camera streams)
687 ///
688 /// This is useful when the topic path is determined at runtime and cannot be a `&'static str`.
689 ///
690 /// # Arguments
691 /// * `topic_path` - The topic path as a String (e.g., "robot/sensors/camera/front/compressed")
692 /// * `message` - The message to publish
693 pub async fn publish_to_path<T>(&self, topic_path: &str, message: &T) -> Result<()>
694 where
695 T: Message + Serialize,
696 {
697 let mut bus = self.message_bus.write().await;
698
699 bus.publish(topic_path, message)
700 .await
701 .with_context(|| {
702 format!(
703 "Failed to publish to topic '{}' from node '{}'",
704 topic_path, self.node_id
705 )
706 })
707 .map_err(|e| Mecha10Error::MessagingError {
708 message: format!("{:#}", e),
709 suggestion: "Check that Redis is running and accepting connections".to_string(),
710 })?;
711
712 Ok(())
713 }
714
715 /// Publish a message to an instance-scoped topic
716 ///
717 /// This automatically appends the context's instance name to the topic path.
718 /// If no instance is set, this behaves like `publish_to()`.
719 ///
720 /// # Arguments
721 ///
722 /// * `topic` - Type-safe topic to publish to
723 /// * `message` - Message to publish (type checked against topic)
724 ///
725 /// # Example
726 ///
727 /// ```rust
728 /// use mecha10::prelude::*;
729 /// use mecha10::topics::sensor;
730 /// use mecha10::messages::Image;
731 ///
732 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
733 /// let ctx = Context::new("camera_fake").await?.with_instance("left");
734 ///
735 /// let image = Image {
736 /// timestamp: now_micros(),
737 /// width: 640,
738 /// height: 480,
739 /// encoding: "rgb8".to_string(),
740 /// data: vec![],
741 /// };
742 ///
743 /// // Publishes to "/sensor/camera/rgb/left"
744 /// ctx.publish_to_scoped(sensor::CAMERA_RGB, &image).await?;
745 /// # Ok(())
746 /// # }
747 /// ```
748 #[cfg(feature = "messaging")]
749 pub async fn publish_to_scoped<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
750 where
751 T: Message + Serialize,
752 {
753 let scoped_path = if let Some(instance) = &self.instance {
754 format!("{}/{}", topic.path(), instance)
755 } else {
756 topic.path().to_string()
757 };
758
759 self.publish_raw(&scoped_path, message).await
760 }
761
762 /// Publish a message to a topic with an explicit instance suffix
763 ///
764 /// This allows publishing to a specific instance's topic regardless of
765 /// the context's own instance name.
766 ///
767 /// # Arguments
768 ///
769 /// * `topic` - Type-safe topic to publish to
770 /// * `instance` - Instance name to append to topic path
771 /// * `message` - Message to publish (type checked against topic)
772 ///
773 /// # Example
774 ///
775 /// ```rust
776 /// use mecha10::prelude::*;
777 /// use mecha10::topics::actuator;
778 ///
779 /// # async fn example(ctx: &Context) -> Result<()> {
780 /// let cmd = MotorCommand { velocity: 1.0, torque: 0.0 };
781 ///
782 /// // Send to specific motor instances
783 /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_left", &cmd).await?;
784 /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_right", &cmd).await?;
785 /// # Ok(())
786 /// # }
787 /// ```
788 #[cfg(feature = "messaging")]
789 pub async fn publish_to_instance<T>(&self, topic: Topic<T>, instance: &str, message: &T) -> Result<()>
790 where
791 T: Message + Serialize,
792 {
793 let instance_path = format!("{}/{}", topic.path(), instance);
794 self.publish_raw(&instance_path, message).await
795 }
796
797 /// Subscribe to an instance-scoped topic
798 ///
799 /// This automatically appends the context's instance name to the topic path.
800 /// If no instance is set, this behaves like `subscribe()`.
801 ///
802 /// # Example
803 ///
804 /// ```rust
805 /// use mecha10::prelude::*;
806 /// use mecha10::topics::sensor;
807 ///
808 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
809 /// let ctx = Context::new("vision_node").await?;
810 ///
811 /// // Subscribe to left camera (topic: "/sensor/camera/rgb/left")
812 /// let mut left_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "left").await?;
813 ///
814 /// // Subscribe to right camera (topic: "/sensor/camera/rgb/right")
815 /// let mut right_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "right").await?;
816 /// # Ok(())
817 /// # }
818 /// ```
819 #[cfg(feature = "messaging")]
820 pub async fn subscribe_scoped<T>(&self, topic: Topic<T>, instance: &str) -> Result<Receiver<T>>
821 where
822 T: Message + DeserializeOwned + Send + 'static,
823 {
824 let scoped_path = format!("{}/{}", topic.path(), instance);
825 self.subscribe_raw(&scoped_path).await
826 }
827
828 /// Subscribe to a topic using a raw string path (for RPC and advanced use cases)
829 ///
830 /// # Arguments
831 ///
832 /// * `topic_path` - Raw topic path string
833 ///
834 /// # Returns
835 ///
836 /// A receiver that will yield messages of type `T`
837 ///
838 /// # Note
839 ///
840 /// This is a low-level method. Prefer using `subscribe()` with type-safe topics
841 /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
842 #[cfg(feature = "messaging")]
843 pub async fn subscribe_raw<T>(&self, topic_path: &str) -> Result<Receiver<T>>
844 where
845 T: Message + DeserializeOwned + Send + 'static,
846 {
847 let mut bus = self.message_bus.write().await;
848
849 // Create a unique consumer group for this node
850 let consumer_group = format!("{}-{}", self.node_id, topic_path.replace('/', "-"));
851
852 let subscriber = bus
853 .subscribe::<T>(topic_path, &consumer_group)
854 .await
855 .with_context(|| {
856 format!(
857 "Failed to subscribe to raw topic '{}' with consumer group '{}' from node '{}'",
858 topic_path, consumer_group, self.node_id
859 )
860 })
861 .map_err(|e| Mecha10Error::MessagingError {
862 message: format!("{:#}", e),
863 suggestion: format!("Check that Redis is running and topic '{}' is valid", topic_path),
864 })?;
865
866 // Convert to our Receiver type
867 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
868
869 // Spawn task to forward messages
870 tokio::spawn(async move {
871 let mut sub = subscriber;
872 while let Some(msg) = sub.recv().await {
873 // Auto-acknowledge message before moving payload
874 if let Err(e) = msg.ack().await {
875 tracing::warn!(
876 topic = %msg.topic,
877 message_id = %msg.id,
878 error = %e,
879 "Failed to acknowledge message (will be redelivered on reconnect)"
880 );
881 }
882 // Extract payload and forward
883 if tx.send(msg.payload).is_err() {
884 break; // Receiver dropped
885 }
886 }
887 });
888
889 Ok(Receiver::from(rx))
890 }
891
892 /// Discover topics matching a glob pattern
893 ///
894 /// Queries Redis for all topics matching the given pattern without subscribing.
895 /// Useful for inspecting available topics before subscribing.
896 ///
897 /// # Pattern Syntax
898 ///
899 /// - `*` matches any sequence of characters within a path segment
900 /// - `**` matches any sequence of path segments
901 /// - `{a,b}` matches either a or b (future enhancement)
902 ///
903 /// # Arguments
904 ///
905 /// * `pattern` - Glob pattern to match topics against
906 ///
907 /// # Returns
908 ///
909 /// A vector of topic paths that match the pattern
910 ///
911 /// # Example
912 ///
913 /// ```rust,no_run
914 /// use mecha10::prelude::*;
915 ///
916 /// # async fn example(ctx: &Context) -> Result<()> {
917 /// // Discover all camera topics
918 /// let topics = ctx.discover_topics("/sensor/camera/*").await?;
919 /// println!("Found {} camera topics: {:?}", topics.len(), topics);
920 /// # Ok(())
921 /// # }
922 /// ```
923 #[cfg(feature = "messaging")]
924 pub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>> {
925 use crate::aggregated::pattern::convert_glob_to_redis;
926
927 let bus = self.message_bus.read().await;
928 let redis_pattern = convert_glob_to_redis(pattern);
929
930 bus.discover_topics(&redis_pattern)
931 .await
932 .with_context(|| {
933 format!(
934 "Failed to discover topics matching pattern '{}' (redis pattern: '{}') from node '{}'",
935 pattern, redis_pattern, self.node_id
936 )
937 })
938 .map_err(|e| Mecha10Error::MessagingError {
939 message: format!("{:#}", e),
940 suggestion: "Check that Redis is running and the pattern is valid".to_string(),
941 })
942 }
943
944 /// Subscribe to multiple topics matching a glob pattern
945 ///
946 /// Returns an aggregated receiver that multiplexes all matching topics into a single stream.
947 /// Each message is tagged with its source topic path.
948 ///
949 /// # Pattern Syntax
950 ///
951 /// - `*` matches any sequence of characters within a path segment
952 /// - `**` matches any sequence of path segments
953 /// - `{a,b}` matches either a or b (future enhancement)
954 ///
955 /// # Arguments
956 ///
957 /// * `pattern` - Glob pattern to match topics against
958 ///
959 /// # Returns
960 ///
961 /// An `AggregatedReceiver<T>` that yields `(topic_path, message)` tuples
962 ///
963 /// # Examples
964 ///
965 /// ```rust,no_run
966 /// use mecha10::prelude::*;
967 ///
968 /// # async fn example(ctx: &Context) -> Result<()> {
969 /// // Subscribe to all cameras with one call
970 /// let mut cameras = ctx.subscribe_aggregated::<ImageMessage>("/sensor/camera/*").await?;
971 ///
972 /// while let Some((topic, image)) = cameras.recv().await {
973 /// let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
974 /// println!("Camera {}: {}x{}", camera_name, image.width, image.height);
975 /// }
976 /// # Ok(())
977 /// # }
978 /// ```
979 #[cfg(feature = "messaging")]
980 pub async fn subscribe_aggregated<T>(&self, pattern: &str) -> Result<crate::aggregated::AggregatedReceiver<T>>
981 where
982 T: Message + DeserializeOwned + Send + 'static,
983 {
984 // 1. Discover all topics matching pattern
985 let matching_topics = self.discover_topics(pattern).await?;
986
987 if matching_topics.is_empty() {
988 return Err(Mecha10Error::MessagingError {
989 message: format!("No topics found matching pattern: {}", pattern),
990 suggestion: "Check that the pattern is correct and topics exist".to_string(),
991 });
992 }
993
994 // 2. Subscribe to each topic
995 let mut receivers = Vec::new();
996 for topic_path in matching_topics {
997 let rx = self.subscribe_raw::<T>(&topic_path).await?;
998 // Extract receiver and wrap in ReceiverType enum
999 let receiver_type = match rx.inner {
1000 crate::context::receiver::ReceiverInner::Unbounded(unbounded) => {
1001 crate::aggregated::ReceiverType::Unbounded(unbounded)
1002 }
1003 crate::context::receiver::ReceiverInner::Bounded(bounded) => {
1004 crate::aggregated::ReceiverType::Bounded(bounded)
1005 }
1006 };
1007 receivers.push((topic_path.clone(), receiver_type));
1008 }
1009
1010 // 3. Return aggregated receiver with support for both channel types
1011 Ok(crate::aggregated::AggregatedReceiver::new_mixed(
1012 pattern.to_string(),
1013 receivers,
1014 ))
1015 }
1016
1017 /// Publish a message to a topic using a raw string path (for RPC and advanced use cases)
1018 ///
1019 /// # Arguments
1020 ///
1021 /// * `topic_path` - Raw topic path string
1022 /// * `message` - Message to publish
1023 ///
1024 /// # Note
1025 ///
1026 /// This is a low-level method. Prefer using `publish_to()` with type-safe topics
1027 /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
1028 #[cfg(feature = "messaging")]
1029 pub async fn publish_raw<T>(&self, topic_path: &str, message: &T) -> Result<()>
1030 where
1031 T: Message + Serialize,
1032 {
1033 let mut bus = self.message_bus.write().await;
1034
1035 bus.publish(topic_path, message)
1036 .await
1037 .with_context(|| {
1038 format!(
1039 "Failed to publish to raw topic '{}' from node '{}'",
1040 topic_path, self.node_id
1041 )
1042 })
1043 .map_err(|e| Mecha10Error::MessagingError {
1044 message: format!("{:#}", e),
1045 suggestion: "Check that Redis is running and accepting connections".to_string(),
1046 })?;
1047
1048 Ok(())
1049 }
1050
1051 /// Check if shutdown has been requested
1052 pub async fn is_shutdown(&self) -> bool {
1053 *self.shutdown.read().await
1054 }
1055
1056 /// Request shutdown
1057 pub async fn shutdown(&self) {
1058 *self.shutdown.write().await = true;
1059
1060 // Stop Redis bridge if running
1061 #[cfg(feature = "messaging")]
1062 {
1063 let bridge_opt = self.redis_bridge.read().await;
1064 if let Some(bridge) = bridge_opt.as_ref() {
1065 if let Err(e) = bridge.stop().await {
1066 tracing::warn!("Failed to stop Redis bridge during shutdown: {}", e);
1067 }
1068 }
1069 }
1070 }
1071
1072 /// Wait for shutdown signal
1073 pub async fn wait_for_shutdown(&self) {
1074 loop {
1075 if self.is_shutdown().await {
1076 break;
1077 }
1078 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1079 }
1080 }
1081
1082 /// Enter a logging span with this node's context
1083 ///
1084 /// This returns a guard that, when entered, adds the node_id to all log messages.
1085 /// The span is automatically exited when the guard is dropped.
1086 ///
1087 /// # Example
1088 ///
1089 /// ```rust
1090 /// use mecha10::prelude::*;
1091 ///
1092 /// # async fn example(ctx: &Context) -> Result<()> {
1093 /// let _guard = ctx.logging_span().entered();
1094 /// info!("This message includes node_id automatically");
1095 /// debug!("So does this one");
1096 /// # Ok(())
1097 /// # }
1098 /// ```
1099 pub fn logging_span(&self) -> tracing::Span {
1100 tracing::info_span!("node", node_id = %self.node_id)
1101 }
1102
1103 /// Subscribe to all camera topics
1104 ///
1105 /// Convenience method for `subscribe_aggregated::<Image>("/sensor/camera/*")`
1106 ///
1107 /// # Example
1108 ///
1109 /// ```rust,no_run
1110 /// use mecha10::prelude::*;
1111 ///
1112 /// # async fn example(ctx: &Context) -> Result<()> {
1113 /// let mut cameras = ctx.subscribe_all_cameras().await?;
1114 /// while let Some((topic, image)) = cameras.recv().await {
1115 /// let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
1116 /// println!("Camera {}: {}x{}", camera_name, image.width, image.height);
1117 /// }
1118 /// # Ok(())
1119 /// # }
1120 /// ```
1121 #[cfg(feature = "messaging")]
1122 pub async fn subscribe_all_cameras(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::Image>> {
1123 self.subscribe_aggregated("/sensor/camera/*").await
1124 }
1125
1126 /// Subscribe to all sensor topics
1127 ///
1128 /// Subscribes to all sensor data including cameras, LiDAR, IMU, odometry, etc.
1129 ///
1130 /// # Example
1131 ///
1132 /// ```rust,no_run
1133 /// use mecha10::prelude::*;
1134 ///
1135 /// # async fn example(ctx: &Context) -> Result<()> {
1136 /// // Subscribe to all sensors (returns untyped messages)
1137 /// let topics = ctx.discover_topics("/sensor/**").await?;
1138 /// println!("Found {} sensor topics", topics.len());
1139 /// # Ok(())
1140 /// # }
1141 /// ```
1142 #[cfg(feature = "messaging")]
1143 pub async fn discover_all_sensors(&self) -> Result<Vec<String>> {
1144 self.discover_topics("/sensor/**").await
1145 }
1146
1147 /// Subscribe to all LiDAR topics
1148 ///
1149 /// Convenience method for `subscribe_aggregated::<LaserScan>("/sensor/lidar/*")`
1150 #[cfg(feature = "messaging")]
1151 pub async fn subscribe_all_lidars(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::LaserScan>> {
1152 self.subscribe_aggregated("/sensor/lidar/*").await
1153 }
1154
1155 /// Subscribe to all IMU topics
1156 ///
1157 /// Convenience method for `subscribe_aggregated::<IMU>("/sensor/imu/*")`
1158 #[cfg(feature = "messaging")]
1159 pub async fn subscribe_all_imus(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::IMU>> {
1160 self.subscribe_aggregated("/sensor/imu/*").await
1161 }
1162
1163 /// Subscribe to all motor topics
1164 ///
1165 /// Convenience method for `subscribe_aggregated::<MotorStatus>("/actuator/motor/*")`
1166 #[cfg(feature = "messaging")]
1167 pub async fn subscribe_all_motors(
1168 &self,
1169 ) -> Result<crate::aggregated::AggregatedReceiver<crate::actuator::MotorStatus>> {
1170 self.subscribe_aggregated("/actuator/motor/*").await
1171 }
1172
1173 /// Subscribe to a topic pattern with wildcard
1174 ///
1175 /// Generic helper for subscribing to multiple topics matching a pattern.
1176 ///
1177 /// # Example
1178 ///
1179 /// ```rust,no_run
1180 /// use mecha10::prelude::*;
1181 /// use mecha10::topics::sensor;
1182 ///
1183 /// # async fn example(ctx: &Context) -> Result<()> {
1184 /// // Subscribe to all instances of a specific topic type
1185 /// let mut cameras = ctx.subscribe_pattern(sensor::CAMERA_RGB, "*").await?;
1186 /// # Ok(())
1187 /// # }
1188 /// ```
1189 #[cfg(feature = "messaging")]
1190 pub async fn subscribe_pattern<T>(
1191 &self,
1192 topic: crate::topics::Topic<T>,
1193 pattern: &str,
1194 ) -> Result<crate::aggregated::AggregatedReceiver<T>>
1195 where
1196 T: Message + DeserializeOwned + Send + 'static,
1197 {
1198 let full_pattern = format!("{}/{}", topic.path(), pattern);
1199 self.subscribe_aggregated(&full_pattern).await
1200 }
1201
1202 /// Get or create a state manager for this node
1203 ///
1204 /// The state manager provides persistent storage for node state across restarts.
1205 /// By default, it uses filesystem storage in `./state/` directory.
1206 ///
1207 /// # Example
1208 ///
1209 /// ```rust,no_run
1210 /// use mecha10::prelude::*;
1211 /// use serde::{Deserialize, Serialize};
1212 ///
1213 /// #[derive(Debug, Clone, Serialize, Deserialize)]
1214 /// struct MyState {
1215 /// counter: u64,
1216 /// position: (f64, f64),
1217 /// }
1218 ///
1219 /// # async fn example(ctx: &Context) -> Result<()> {
1220 /// let state_mgr = ctx.state_manager().await?;
1221 ///
1222 /// // Load previous state
1223 /// let mut state = state_mgr
1224 /// .load::<MyState>("my_node")
1225 /// .await?
1226 /// .unwrap_or(MyState {
1227 /// counter: 0,
1228 /// position: (0.0, 0.0),
1229 /// });
1230 ///
1231 /// // Update state
1232 /// state.counter += 1;
1233 ///
1234 /// // Save state
1235 /// state_mgr.save("my_node", &state).await?;
1236 /// # Ok(())
1237 /// # }
1238 /// ```
1239 pub async fn state_manager(&self) -> Result<Arc<crate::state::ConcreteStateManager>> {
1240 let mut mgr = self.state_manager.write().await;
1241
1242 if let Some(existing) = mgr.as_ref() {
1243 return Ok(Arc::clone(existing));
1244 }
1245
1246 // Create default filesystem-based state manager
1247 let state_dir = std::env::var("MECHA10_STATE_DIR").unwrap_or_else(|_| "./state".to_string());
1248 let fs_mgr = FilesystemStateManager::new(&state_dir).await?;
1249 let arc_mgr = Arc::new(crate::state::ConcreteStateManager::Filesystem(fs_mgr));
1250
1251 *mgr = Some(Arc::clone(&arc_mgr));
1252
1253 Ok(arc_mgr)
1254 }
1255
1256 /// Set a custom state manager for this context
1257 ///
1258 /// This allows using alternative backends (Redis, PostgreSQL, etc.) or custom implementations.
1259 ///
1260 /// # Example
1261 ///
1262 /// ```rust,no_run
1263 /// use mecha10::prelude::*;
1264 /// use std::sync::Arc;
1265 ///
1266 /// # async fn example(ctx: &Context) -> Result<()> {
1267 /// // Use in-memory state manager for testing
1268 /// let mem_mgr = Arc::new(MemoryStateManager::new());
1269 /// ctx.set_state_manager(mem_mgr).await;
1270 /// # Ok(())
1271 /// # }
1272 /// ```
1273 pub async fn set_state_manager(&self, manager: Arc<crate::state::ConcreteStateManager>) {
1274 let mut mgr = self.state_manager.write().await;
1275 *mgr = Some(manager);
1276 }
1277
1278 // ========================================
1279 // Topic Discovery Helpers
1280 // ========================================
1281
1282 /// List all sensor topics currently active in the system
1283 ///
1284 /// Returns all topics under `/sensor/*` including cameras, LiDAR, IMU, GPS, etc.
1285 ///
1286 /// # Example
1287 ///
1288 /// ```rust,no_run
1289 /// use mecha10::prelude::*;
1290 ///
1291 /// # async fn example(ctx: &Context) -> Result<()> {
1292 /// let sensors = ctx.list_sensor_topics().await?;
1293 /// println!("Available sensors:");
1294 /// for topic in sensors {
1295 /// println!(" - {}", topic);
1296 /// }
1297 /// # Ok(())
1298 /// # }
1299 /// ```
1300 #[cfg(feature = "messaging")]
1301 pub async fn list_sensor_topics(&self) -> Result<Vec<String>> {
1302 self.discover_topics("/sensor/*").await
1303 }
1304
1305 /// List all actuator topics currently active in the system
1306 ///
1307 /// Returns all topics under `/actuator/*` including motors, servos, grippers, etc.
1308 ///
1309 /// # Example
1310 ///
1311 /// ```rust,no_run
1312 /// use mecha10::prelude::*;
1313 ///
1314 /// # async fn example(ctx: &Context) -> Result<()> {
1315 /// let actuators = ctx.list_actuator_topics().await?;
1316 /// println!("Available actuators:");
1317 /// for topic in actuators {
1318 /// println!(" - {}", topic);
1319 /// }
1320 /// # Ok(())
1321 /// # }
1322 /// ```
1323 #[cfg(feature = "messaging")]
1324 pub async fn list_actuator_topics(&self) -> Result<Vec<String>> {
1325 self.discover_topics("/actuator/*").await
1326 }
1327
1328 /// List all topics in the system
1329 ///
1330 /// Returns all active topics regardless of category.
1331 ///
1332 /// # Example
1333 ///
1334 /// ```rust,no_run
1335 /// use mecha10::prelude::*;
1336 ///
1337 /// # async fn example(ctx: &Context) -> Result<()> {
1338 /// let all_topics = ctx.list_all_topics().await?;
1339 /// println!("All active topics ({} total):", all_topics.len());
1340 /// for topic in all_topics {
1341 /// println!(" - {}", topic);
1342 /// }
1343 /// # Ok(())
1344 /// # }
1345 /// ```
1346 #[cfg(feature = "messaging")]
1347 pub async fn list_all_topics(&self) -> Result<Vec<String>> {
1348 self.discover_topics("*").await
1349 }
1350
1351 /// List all camera topics
1352 ///
1353 /// Returns all topics under `/sensor/camera/*`.
1354 ///
1355 /// # Example
1356 ///
1357 /// ```rust,no_run
1358 /// use mecha10::prelude::*;
1359 ///
1360 /// # async fn example(ctx: &Context) -> Result<()> {
1361 /// let cameras = ctx.list_camera_topics().await?;
1362 /// println!("Available cameras:");
1363 /// for topic in cameras {
1364 /// println!(" - {}", topic);
1365 /// }
1366 /// # Ok(())
1367 /// # }
1368 /// ```
1369 #[cfg(feature = "messaging")]
1370 pub async fn list_camera_topics(&self) -> Result<Vec<String>> {
1371 self.discover_topics("/sensor/camera/*").await
1372 }
1373
1374 /// List all LiDAR topics
1375 ///
1376 /// Returns all topics under `/sensor/lidar/*`.
1377 #[cfg(feature = "messaging")]
1378 pub async fn list_lidar_topics(&self) -> Result<Vec<String>> {
1379 self.discover_topics("/sensor/lidar/*").await
1380 }
1381
1382 /// List all IMU topics
1383 ///
1384 /// Returns all topics under `/sensor/imu/*`.
1385 #[cfg(feature = "messaging")]
1386 pub async fn list_imu_topics(&self) -> Result<Vec<String>> {
1387 self.discover_topics("/sensor/imu/*").await
1388 }
1389
1390 /// List all motor topics
1391 ///
1392 /// Returns all topics under `/actuator/motor/*`.
1393 #[cfg(feature = "messaging")]
1394 pub async fn list_motor_topics(&self) -> Result<Vec<String>> {
1395 self.discover_topics("/actuator/motor/*").await
1396 }
1397
1398 /// Print a formatted summary of all active topics
1399 ///
1400 /// Useful for debugging and introspection. Groups topics by category
1401 /// and displays them in a human-readable format.
1402 ///
1403 /// # Example
1404 ///
1405 /// ```rust,no_run
1406 /// use mecha10::prelude::*;
1407 ///
1408 /// # async fn example(ctx: &Context) -> Result<()> {
1409 /// ctx.print_topic_summary().await?;
1410 /// // Output:
1411 /// // === Active Topics ===
1412 /// // Sensors (3):
1413 /// // - /sensor/camera/front
1414 /// // - /sensor/lidar/main
1415 /// // - /sensor/imu/base
1416 /// // Actuators (2):
1417 /// // - /actuator/motor/left
1418 /// // - /actuator/motor/right
1419 /// // =====================
1420 /// # Ok(())
1421 /// # }
1422 /// ```
1423 #[cfg(feature = "messaging")]
1424 pub async fn print_topic_summary(&self) -> Result<()> {
1425 let sensors = self.list_sensor_topics().await.unwrap_or_default();
1426 let actuators = self.list_actuator_topics().await.unwrap_or_default();
1427 let all_topics = self.list_all_topics().await?;
1428
1429 let other_topics: Vec<String> = all_topics
1430 .into_iter()
1431 .filter(|t| !t.starts_with("/sensor/") && !t.starts_with("/actuator/"))
1432 .collect();
1433
1434 println!("=== Active Topics ===");
1435
1436 if !sensors.is_empty() {
1437 println!("Sensors ({}):", sensors.len());
1438 for topic in sensors {
1439 println!(" - {}", topic);
1440 }
1441 }
1442
1443 if !actuators.is_empty() {
1444 println!("Actuators ({}):", actuators.len());
1445 for topic in actuators {
1446 println!(" - {}", topic);
1447 }
1448 }
1449
1450 if !other_topics.is_empty() {
1451 println!("Other ({}):", other_topics.len());
1452 for topic in other_topics {
1453 println!(" - {}", topic);
1454 }
1455 }
1456
1457 println!("=====================");
1458
1459 Ok(())
1460 }
1461}