mecha10_core/context/mod.rs
1//! Node execution context
2//!
3//! Provides a high-level API for nodes to interact with the framework,
4//! including type-safe pub/sub, configuration access, and lifecycle management.
5//!
6//! This module has been refactored into smaller sub-modules for better organization:
7//! - `infra_config`: Infrastructure configuration types
8//! - `receiver`: Message receiver wrapper
9
10mod builder;
11mod infra_config;
12mod receiver;
13
14pub use builder::ContextBuilder;
15pub use infra_config::{BridgeConfig, InfrastructureConfig, MinioConfig, MlflowConfig, PostgresConfig, RedisConfig};
16pub use receiver::Receiver;
17
18use crate::error::{Mecha10Error, Result};
19use crate::messages::Message;
20use crate::state::FilesystemStateManager;
21use crate::topics::Topic;
22use anyhow::Context as _;
23use serde::{de::DeserializeOwned, Serialize};
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27// Import the complete Context struct and all implementations from the backup file
28// This maintains backward compatibility while improving code organization
29
30pub struct Context {
31 /// Node identifier (includes instance if set: "node_name" or "node_name/instance")
32 node_id: String,
33
34 /// Optional instance name
35 instance: Option<String>,
36
37 /// Local message bus for on-robot communication (wrapped in Arc for sharing across tasks)
38 #[cfg(feature = "messaging")]
39 message_bus: Arc<RwLock<mecha10_messaging::MessageBus>>,
40
41 /// Control plane message bus for robot-to-cloud communication (optional)
42 #[cfg(feature = "messaging")]
43 control_plane_bus: Arc<RwLock<Option<mecha10_messaging::MessageBus>>>,
44
45 /// Redis bridge for forwarding messages between local and control plane
46 #[cfg(feature = "messaging")]
47 redis_bridge: Arc<RwLock<Option<crate::redis_bridge::RedisBridge>>>,
48
49 /// Shutdown signal
50 shutdown: Arc<RwLock<bool>>,
51
52 /// State manager for persistent node state (lazy initialized)
53 state_manager: Arc<RwLock<Option<Arc<crate::state::ConcreteStateManager>>>>,
54}
55impl Clone for Context {
56 fn clone(&self) -> Self {
57 Self {
58 node_id: self.node_id.clone(),
59 instance: self.instance.clone(),
60 #[cfg(feature = "messaging")]
61 message_bus: Arc::clone(&self.message_bus),
62 #[cfg(feature = "messaging")]
63 control_plane_bus: Arc::clone(&self.control_plane_bus),
64 #[cfg(feature = "messaging")]
65 redis_bridge: Arc::clone(&self.redis_bridge),
66 shutdown: Arc::clone(&self.shutdown),
67 state_manager: Arc::clone(&self.state_manager),
68 }
69 }
70}
71impl Context {
72 /// Create a new context
73 ///
74 /// # Arguments
75 ///
76 /// * `node_id` - Unique identifier for this node
77 ///
78 /// # Example
79 ///
80 /// ```rust
81 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
82 /// let ctx = Context::new("my-node").await?;
83 /// # Ok(())
84 /// # }
85 /// ```
86 pub async fn new(node_id: &str) -> Result<Self> {
87 #[cfg(feature = "messaging")]
88 let (message_bus, control_plane_bus, redis_bridge) = {
89 // Load infrastructure config
90 let config = Self::load_infrastructure_config()?;
91
92 // Connect to local Redis
93 let redis_url = Self::get_redis_url_from_config(&config)?;
94 let local_bus = mecha10_messaging::MessageBus::connect(&redis_url, node_id)
95 .await
96 .with_context(|| {
97 format!(
98 "Failed to connect local message bus for node '{}' to Redis at '{}'",
99 node_id, redis_url
100 )
101 })
102 .map_err(|e| Mecha10Error::MessagingError {
103 message: format!("{:#}", e),
104 suggestion: "Ensure local Redis is running at the configured URL".to_string(),
105 })?;
106
107 // Connect to control plane Redis if configured
108 let cp_bus = if let Some(cp_config) = &config.control_plane_redis {
109 let cp_url = &cp_config.url;
110 match mecha10_messaging::MessageBus::connect(cp_url, node_id).await {
111 Ok(bus) => {
112 tracing::info!(
113 "Connected to control plane Redis at '{}' for node '{}'",
114 cp_url,
115 node_id
116 );
117 Some(bus)
118 }
119 Err(e) => {
120 tracing::warn!(
121 "Failed to connect to control plane Redis at '{}': {}. Continuing with local Redis only.",
122 cp_url,
123 e
124 );
125 None
126 }
127 }
128 } else {
129 None
130 };
131
132 // Initialize Redis bridge if both connections are available
133 let bridge = if let Some(_cp_bus_ref) = &cp_bus {
134 // Get bridge configuration (use default if not specified)
135 let bridge_config = config.bridge.unwrap_or_default();
136
137 if bridge_config.enabled {
138 // Create bridge with separate clones of the buses
139 let bridge_local_bus = mecha10_messaging::MessageBus::connect(&redis_url, node_id)
140 .await
141 .map_err(|e| Mecha10Error::MessagingError {
142 message: format!("Failed to create bridge connection to local Redis: {}", e),
143 suggestion: "Check local Redis connection".to_string(),
144 })?;
145
146 let cp_url = config
147 .control_plane_redis
148 .as_ref()
149 .map(|c| c.url.clone())
150 .unwrap_or_default();
151 let bridge_cp_bus =
152 mecha10_messaging::MessageBus::connect(&cp_url, node_id)
153 .await
154 .map_err(|e| Mecha10Error::MessagingError {
155 message: format!("Failed to create bridge connection to control plane Redis: {}", e),
156 suggestion: "Check control plane Redis connection".to_string(),
157 })?;
158
159 // Convert from infra_config::BridgeConfig to redis_bridge::BridgeConfig
160 let redis_bridge_config = crate::redis_bridge::BridgeConfig {
161 enabled: bridge_config.enabled,
162 local_topics: bridge_config.local_topics,
163 bridged_topics: bridge_config.bridged_topics,
164 robot_id: bridge_config.robot_id,
165 };
166
167 let bridge =
168 crate::redis_bridge::RedisBridge::new(redis_bridge_config, bridge_local_bus, bridge_cp_bus);
169
170 // Start the bridge
171 if let Err(e) = bridge.start().await {
172 tracing::warn!("Failed to start Redis bridge: {}. Continuing without bridge.", e);
173 None
174 } else {
175 tracing::info!("Redis bridge started successfully");
176 Some(bridge)
177 }
178 } else {
179 tracing::debug!("Redis bridge is disabled in configuration");
180 None
181 }
182 } else {
183 None
184 };
185
186 (local_bus, cp_bus, bridge)
187 };
188
189 Ok(Self {
190 node_id: node_id.to_string(),
191 instance: None,
192 #[cfg(feature = "messaging")]
193 message_bus: Arc::new(RwLock::new(message_bus)),
194 #[cfg(feature = "messaging")]
195 control_plane_bus: Arc::new(RwLock::new(control_plane_bus)),
196 #[cfg(feature = "messaging")]
197 redis_bridge: Arc::new(RwLock::new(redis_bridge)),
198 shutdown: Arc::new(RwLock::new(false)),
199 state_manager: Arc::new(RwLock::new(None)),
200 })
201 }
202
203 /// Set an instance name for this context (builder pattern)
204 ///
205 /// This allows the same node implementation to run multiple instances
206 /// with different names. The node_id becomes "{node_name}/{instance}".
207 ///
208 /// # Example
209 ///
210 /// ```rust
211 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
212 /// // Single instance (no instance name)
213 /// let ctx = Context::new("camera_fake").await?;
214 /// // node_id: "camera_fake"
215 ///
216 /// // Multi-instance (with instance names)
217 /// let left_ctx = Context::new("camera_fake").await?.with_instance("left");
218 /// // node_id: "camera_fake/left"
219 ///
220 /// let right_ctx = Context::new("camera_fake").await?.with_instance("right");
221 /// // node_id: "camera_fake/right"
222 /// # Ok(())
223 /// # }
224 /// ```
225 pub fn with_instance(mut self, instance: &str) -> Self {
226 self.instance = Some(instance.to_string());
227 self.node_id = format!("{}/{}", self.node_id, instance);
228 self
229 }
230
231 /// Get the instance name if set
232 ///
233 /// Returns `None` for single-instance nodes, or `Some(instance)` for
234 /// multi-instance nodes.
235 pub fn instance(&self) -> Option<&str> {
236 self.instance.as_deref()
237 }
238
239 /// Load infrastructure configuration based on environment
240 ///
241 /// Priority order:
242 /// 1. mecha10.json (project configuration) - redis.url field
243 /// 2. Environment variables (MECHA10_REDIS_URL or REDIS_URL)
244 /// 3. config/infrastructure.<environment>.yaml
245 ///
246 /// Set via MECHA10_ENVIRONMENT environment variable.
247 fn load_infrastructure_config() -> Result<InfrastructureConfig> {
248 use crate::config::load_config;
249 use std::env;
250 use std::path::PathBuf;
251
252 // 1. Try loading from mecha10.json FIRST (project configuration)
253 if let Ok(redis_url) = Self::try_load_redis_from_mecha10_json() {
254 return Ok(InfrastructureConfig {
255 redis: crate::context::infra_config::RedisConfig {
256 url: redis_url,
257 max_connections: 10,
258 connection_timeout_ms: 5000,
259 },
260 control_plane_redis: None,
261 bridge: None,
262 postgres: None,
263 mlflow: None,
264 minio: None,
265 });
266 }
267
268 // 2. Check for environment variable overrides
269 // If REDIS_URL or MECHA10_REDIS_URL is set, use that and create minimal config
270 if let Ok(redis_url) = env::var("MECHA10_REDIS_URL").or_else(|_| env::var("REDIS_URL")) {
271 return Ok(InfrastructureConfig {
272 redis: crate::context::infra_config::RedisConfig {
273 url: redis_url,
274 max_connections: 10,
275 connection_timeout_ms: 5000,
276 },
277 control_plane_redis: None,
278 bridge: None,
279 postgres: None,
280 mlflow: None,
281 minio: None,
282 });
283 }
284
285 // 3. Load from infrastructure.yaml
286 let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
287
288 let config_filename = format!("infrastructure.{}.yaml", environment);
289
290 // Try multiple paths in order:
291 // 1. Current directory (for running from workspace root)
292 // 2. Workspace root (for running tests from target/debug/deps)
293 // 3. Two levels up (for running tests from packages/core/target/debug/deps)
294 let paths_to_try = vec![
295 PathBuf::from(format!("config/{}", config_filename)),
296 PathBuf::from(format!("../../config/{}", config_filename)),
297 PathBuf::from(format!("../../../../config/{}", config_filename)),
298 ];
299
300 for path in &paths_to_try {
301 if path.exists() {
302 return load_config(path)
303 .with_context(|| {
304 format!(
305 "Failed to load infrastructure config for environment '{}' from '{}'",
306 environment,
307 path.display()
308 )
309 })
310 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
311 }
312 }
313
314 // If no config file found, return error with helpful message
315 Err(Mecha10Error::Configuration(format!(
316 "Failed to load infrastructure config for environment '{}'. \
317 Tried paths: {}. \
318 Make sure the file exists, set MECHA10_REDIS_URL environment variable, \
319 or configure redis.url in mecha10.json.",
320 environment,
321 paths_to_try
322 .iter()
323 .map(|p| p.display().to_string())
324 .collect::<Vec<_>>()
325 .join(", ")
326 )))
327 }
328
329 /// Try to load Redis URL from mecha10.json
330 ///
331 /// Looks for mecha10.json in current directory and reads redis.url field.
332 fn try_load_redis_from_mecha10_json() -> Result<String> {
333 use std::path::PathBuf;
334
335 // Try multiple paths to find mecha10.json
336 let paths_to_try = vec![
337 PathBuf::from("mecha10.json"),
338 PathBuf::from("../mecha10.json"),
339 PathBuf::from("../../mecha10.json"),
340 ];
341
342 for path in &paths_to_try {
343 if !path.exists() {
344 continue;
345 }
346
347 // Read and parse mecha10.json
348 let content = std::fs::read_to_string(path)
349 .map_err(|e| Mecha10Error::Configuration(format!("Failed to read {}: {}", path.display(), e)))?;
350
351 let json: serde_json::Value = serde_json::from_str(&content)
352 .map_err(|e| Mecha10Error::Configuration(format!("Failed to parse {}: {}", path.display(), e)))?;
353
354 // Extract redis.url
355 if let Some(redis_url) = json.get("redis").and_then(|r| r.get("url")).and_then(|u| u.as_str()) {
356 return Ok(redis_url.to_string());
357 }
358 }
359
360 Err(Mecha10Error::Configuration(
361 "No mecha10.json found with redis.url configuration".to_string(),
362 ))
363 }
364
365 /// Get Redis URL from infrastructure configuration (helper for internal use)
366 fn get_redis_url_from_config(config: &InfrastructureConfig) -> Result<String> {
367 // First try environment variables (for backward compatibility and testing)
368 if let Ok(url) = std::env::var("MECHA10_REDIS_URL") {
369 return Ok(url);
370 }
371 if let Ok(url) = std::env::var("REDIS_URL") {
372 return Ok(url);
373 }
374
375 // Use config
376 Ok(config.redis.url.clone())
377 }
378
379 /// Get Redis URL from infrastructure configuration
380 pub fn get_redis_url() -> Result<String> {
381 let config = Self::load_infrastructure_config()?;
382 Self::get_redis_url_from_config(&config)
383 }
384
385 /// Get the node ID
386 pub fn node_id(&self) -> &str {
387 &self.node_id
388 }
389
390 /// Load node configuration with environment-aware file-level fallback
391 ///
392 /// This method implements the V2 configuration system with:
393 /// - Environment-specific configs (dev/staging/production)
394 /// - File-level fallback (no key-by-key merging)
395 /// - Automatic path resolution
396 ///
397 /// **Fallback Priority:**
398 /// 1. `configs/{env}/nodes/{node-name}/config.json` (environment-specific)
399 /// 2. `configs/common/nodes/{node-name}/config.json` (common fallback)
400 /// 3. `T::default()` (code default)
401 ///
402 /// **Environment Detection:**
403 /// - Uses `MECHA10_ENVIRONMENT` env var (default: "dev")
404 /// - Supported values: "dev", "staging", "production", or custom
405 ///
406 /// # Type Parameters
407 ///
408 /// * `T` - Configuration type that implements `DeserializeOwned` + `Default`
409 ///
410 /// # Arguments
411 ///
412 /// * `node_name` - Name of the node (e.g., "simulation-bridge", "camera-streamer")
413 ///
414 /// # Returns
415 ///
416 /// The loaded configuration of type `T`, using the first available source
417 /// in the fallback chain.
418 ///
419 /// # Example
420 ///
421 /// ```rust
422 /// use mecha10::prelude::*;
423 /// use serde::{Deserialize, Serialize};
424 ///
425 /// #[derive(Debug, Clone, Deserialize, Serialize, Default)]
426 /// struct SimBridgeConfig {
427 /// camera_width: u32,
428 /// camera_height: u32,
429 /// camera_fps: u32,
430 /// }
431 ///
432 /// # async fn example(ctx: &Context) -> Result<()> {
433 /// // Loads from configs/{env}/nodes/simulation-bridge/config.json
434 /// // Falls back to configs/common/nodes/simulation-bridge/config.json
435 /// // Falls back to SimBridgeConfig::default()
436 /// let config: SimBridgeConfig = ctx.load_node_config("simulation-bridge").await?;
437 /// # Ok(())
438 /// # }
439 /// ```
440 pub async fn load_node_config<T>(&self, node_name: &str) -> Result<T>
441 where
442 T: DeserializeOwned + Default,
443 {
444 use crate::config::load_config;
445 use std::env;
446 use std::path::PathBuf;
447
448 // Get environment from env var (default: "dev")
449 let environment = env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "dev".to_string());
450
451 // Try multiple base paths (for running from different locations)
452 let base_paths = vec![
453 PathBuf::from("."), // Current directory
454 PathBuf::from("../.."), // From target/debug/deps
455 PathBuf::from("../../../.."), // From packages/*/target/debug/deps
456 ];
457
458 // Build fallback chain for each base path
459 for base_path in &base_paths {
460 // 1. Try environment-specific config
461 let env_config_path = base_path
462 .join("configs")
463 .join(&environment)
464 .join("nodes")
465 .join(node_name)
466 .join("config.json");
467
468 if env_config_path.exists() {
469 tracing::debug!(
470 "Loading node config for '{}' from environment-specific path: {}",
471 node_name,
472 env_config_path.display()
473 );
474 return load_config(&env_config_path)
475 .with_context(|| {
476 format!(
477 "Failed to load node config for '{}' from '{}'",
478 node_name,
479 env_config_path.display()
480 )
481 })
482 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
483 }
484
485 // 2. Try common config
486 let common_config_path = base_path
487 .join("configs")
488 .join("common")
489 .join("nodes")
490 .join(node_name)
491 .join("config.json");
492
493 if common_config_path.exists() {
494 tracing::debug!(
495 "Loading node config for '{}' from common path: {}",
496 node_name,
497 common_config_path.display()
498 );
499 return load_config(&common_config_path)
500 .with_context(|| {
501 format!(
502 "Failed to load node config for '{}' from '{}'",
503 node_name,
504 common_config_path.display()
505 )
506 })
507 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)));
508 }
509 }
510
511 // 3. Fall back to default
512 tracing::debug!(
513 "No config file found for node '{}', using Default implementation",
514 node_name
515 );
516 Ok(T::default())
517 }
518
519 /// Check if control plane Redis connection is available
520 #[cfg(feature = "messaging")]
521 pub async fn has_control_plane(&self) -> bool {
522 self.control_plane_bus.read().await.is_some()
523 }
524
525 /// Get control plane Redis URL from infrastructure configuration
526 pub fn get_control_plane_redis_url() -> Result<Option<String>> {
527 let config = Self::load_infrastructure_config()?;
528 Ok(config.control_plane_redis.map(|c| c.url))
529 }
530
531 /// Subscribe to a topic with type-safe message handling
532 ///
533 /// # Arguments
534 ///
535 /// * `topic` - Type-safe topic to subscribe to
536 ///
537 /// # Returns
538 ///
539 /// A receiver that will yield messages of type `T`
540 ///
541 /// # Example
542 ///
543 /// ```rust
544 /// use mecha10::prelude::*;
545 /// use mecha10::topics::sensor;
546 ///
547 /// # async fn example(ctx: &Context) -> Result<()> {
548 /// // Type is inferred as Receiver<Image>
549 /// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
550 ///
551 /// while let Some(image) = images.recv().await {
552 /// println!("Image: {}x{}", image.width, image.height);
553 /// }
554 /// # Ok(())
555 /// # }
556 /// ```
557 #[cfg(feature = "messaging")]
558 pub async fn subscribe<T>(&self, topic: Topic<T>) -> Result<Receiver<T>>
559 where
560 T: Message + DeserializeOwned + Send + 'static,
561 {
562 let mut bus = self.message_bus.write().await;
563
564 // Create a unique consumer group for this node
565 let consumer_group = format!("{}-{}", self.node_id, topic.path().replace('/', "-"));
566
567 let subscriber = bus
568 .subscribe::<T>(topic.path(), &consumer_group)
569 .await
570 .with_context(|| {
571 format!(
572 "Failed to subscribe to topic '{}' with consumer group '{}' from node '{}'",
573 topic.path(),
574 consumer_group,
575 self.node_id
576 )
577 })
578 .map_err(|e| Mecha10Error::MessagingError {
579 message: format!("{:#}", e),
580 suggestion: format!("Check that Redis is running and topic '{}' is valid", topic.path()),
581 })?;
582
583 // Convert to our Receiver type
584 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
585
586 // Spawn task to forward messages
587 tokio::spawn(async move {
588 let mut sub = subscriber;
589 while let Some(msg) = sub.recv().await {
590 // Auto-acknowledge message before moving payload
591 if let Err(e) = msg.ack().await {
592 tracing::warn!(
593 topic = %msg.topic,
594 message_id = %msg.id,
595 error = %e,
596 "Failed to acknowledge message (will be redelivered on reconnect)"
597 );
598 }
599 // Extract payload and forward
600 if tx.send(msg.payload).is_err() {
601 break; // Receiver dropped
602 }
603 }
604 });
605
606 Ok(Receiver::from(rx))
607 }
608
609 /// Publish a message to a specific topic
610 ///
611 /// # Arguments
612 ///
613 /// * `topic` - Type-safe topic to publish to
614 /// * `message` - Message to publish (type checked against topic)
615 ///
616 /// # Example
617 ///
618 /// ```rust
619 /// use mecha10::prelude::*;
620 /// use mecha10::topics::sensor;
621 /// use mecha10::messages::Image;
622 ///
623 /// # async fn example(ctx: &Context) -> Result<()> {
624 /// let image = Image {
625 /// timestamp: now_micros(),
626 /// width: 640,
627 /// height: 480,
628 /// encoding: "rgb8".to_string(),
629 /// data: vec![],
630 /// };
631 ///
632 /// // Type checked: image must be of type Image
633 /// ctx.publish_to(sensor::CAMERA_RGB, &image).await?;
634 /// # Ok(())
635 /// # }
636 /// ```
637 #[cfg(feature = "messaging")]
638 pub async fn publish_to<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
639 where
640 T: Message + Serialize,
641 {
642 let mut bus = self.message_bus.write().await;
643
644 bus.publish(topic.path(), message)
645 .await
646 .with_context(|| {
647 format!(
648 "Failed to publish to topic '{}' from node '{}'",
649 topic.path(),
650 self.node_id
651 )
652 })
653 .map_err(|e| Mecha10Error::MessagingError {
654 message: format!("{:#}", e),
655 suggestion: "Check that Redis is running and accepting connections".to_string(),
656 })?;
657
658 Ok(())
659 }
660
661 /// Publish a message to a dynamic topic path (for runtime-generated topics like camera streams)
662 ///
663 /// This is useful when the topic path is determined at runtime and cannot be a `&'static str`.
664 ///
665 /// # Arguments
666 /// * `topic_path` - The topic path as a String (e.g., "robot/sensors/camera/front/compressed")
667 /// * `message` - The message to publish
668 pub async fn publish_to_path<T>(&self, topic_path: &str, message: &T) -> Result<()>
669 where
670 T: Message + Serialize,
671 {
672 let mut bus = self.message_bus.write().await;
673
674 bus.publish(topic_path, message)
675 .await
676 .with_context(|| {
677 format!(
678 "Failed to publish to topic '{}' from node '{}'",
679 topic_path, self.node_id
680 )
681 })
682 .map_err(|e| Mecha10Error::MessagingError {
683 message: format!("{:#}", e),
684 suggestion: "Check that Redis is running and accepting connections".to_string(),
685 })?;
686
687 Ok(())
688 }
689
690 /// Publish a message to an instance-scoped topic
691 ///
692 /// This automatically appends the context's instance name to the topic path.
693 /// If no instance is set, this behaves like `publish_to()`.
694 ///
695 /// # Arguments
696 ///
697 /// * `topic` - Type-safe topic to publish to
698 /// * `message` - Message to publish (type checked against topic)
699 ///
700 /// # Example
701 ///
702 /// ```rust
703 /// use mecha10::prelude::*;
704 /// use mecha10::topics::sensor;
705 /// use mecha10::messages::Image;
706 ///
707 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
708 /// let ctx = Context::new("camera_fake").await?.with_instance("left");
709 ///
710 /// let image = Image {
711 /// timestamp: now_micros(),
712 /// width: 640,
713 /// height: 480,
714 /// encoding: "rgb8".to_string(),
715 /// data: vec![],
716 /// };
717 ///
718 /// // Publishes to "/sensor/camera/rgb/left"
719 /// ctx.publish_to_scoped(sensor::CAMERA_RGB, &image).await?;
720 /// # Ok(())
721 /// # }
722 /// ```
723 #[cfg(feature = "messaging")]
724 pub async fn publish_to_scoped<T>(&self, topic: Topic<T>, message: &T) -> Result<()>
725 where
726 T: Message + Serialize,
727 {
728 let scoped_path = if let Some(instance) = &self.instance {
729 format!("{}/{}", topic.path(), instance)
730 } else {
731 topic.path().to_string()
732 };
733
734 self.publish_raw(&scoped_path, message).await
735 }
736
737 /// Publish a message to a topic with an explicit instance suffix
738 ///
739 /// This allows publishing to a specific instance's topic regardless of
740 /// the context's own instance name.
741 ///
742 /// # Arguments
743 ///
744 /// * `topic` - Type-safe topic to publish to
745 /// * `instance` - Instance name to append to topic path
746 /// * `message` - Message to publish (type checked against topic)
747 ///
748 /// # Example
749 ///
750 /// ```rust
751 /// use mecha10::prelude::*;
752 /// use mecha10::topics::actuator;
753 ///
754 /// # async fn example(ctx: &Context) -> Result<()> {
755 /// let cmd = MotorCommand { velocity: 1.0, torque: 0.0 };
756 ///
757 /// // Send to specific motor instances
758 /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_left", &cmd).await?;
759 /// ctx.publish_to_instance(actuator::MOTOR_CMD, "front_right", &cmd).await?;
760 /// # Ok(())
761 /// # }
762 /// ```
763 #[cfg(feature = "messaging")]
764 pub async fn publish_to_instance<T>(&self, topic: Topic<T>, instance: &str, message: &T) -> Result<()>
765 where
766 T: Message + Serialize,
767 {
768 let instance_path = format!("{}/{}", topic.path(), instance);
769 self.publish_raw(&instance_path, message).await
770 }
771
772 /// Subscribe to an instance-scoped topic
773 ///
774 /// This automatically appends the context's instance name to the topic path.
775 /// If no instance is set, this behaves like `subscribe()`.
776 ///
777 /// # Example
778 ///
779 /// ```rust
780 /// use mecha10::prelude::*;
781 /// use mecha10::topics::sensor;
782 ///
783 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
784 /// let ctx = Context::new("vision_node").await?;
785 ///
786 /// // Subscribe to left camera (topic: "/sensor/camera/rgb/left")
787 /// let mut left_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "left").await?;
788 ///
789 /// // Subscribe to right camera (topic: "/sensor/camera/rgb/right")
790 /// let mut right_images = ctx.subscribe_scoped(sensor::CAMERA_RGB, "right").await?;
791 /// # Ok(())
792 /// # }
793 /// ```
794 #[cfg(feature = "messaging")]
795 pub async fn subscribe_scoped<T>(&self, topic: Topic<T>, instance: &str) -> Result<Receiver<T>>
796 where
797 T: Message + DeserializeOwned + Send + 'static,
798 {
799 let scoped_path = format!("{}/{}", topic.path(), instance);
800 self.subscribe_raw(&scoped_path).await
801 }
802
803 /// Subscribe to a topic using a raw string path (for RPC and advanced use cases)
804 ///
805 /// # Arguments
806 ///
807 /// * `topic_path` - Raw topic path string
808 ///
809 /// # Returns
810 ///
811 /// A receiver that will yield messages of type `T`
812 ///
813 /// # Note
814 ///
815 /// This is a low-level method. Prefer using `subscribe()` with type-safe topics
816 /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
817 #[cfg(feature = "messaging")]
818 pub async fn subscribe_raw<T>(&self, topic_path: &str) -> Result<Receiver<T>>
819 where
820 T: Message + DeserializeOwned + Send + 'static,
821 {
822 let mut bus = self.message_bus.write().await;
823
824 // Create a unique consumer group for this node
825 let consumer_group = format!("{}-{}", self.node_id, topic_path.replace('/', "-"));
826
827 let subscriber = bus
828 .subscribe::<T>(topic_path, &consumer_group)
829 .await
830 .with_context(|| {
831 format!(
832 "Failed to subscribe to raw topic '{}' with consumer group '{}' from node '{}'",
833 topic_path, consumer_group, self.node_id
834 )
835 })
836 .map_err(|e| Mecha10Error::MessagingError {
837 message: format!("{:#}", e),
838 suggestion: format!("Check that Redis is running and topic '{}' is valid", topic_path),
839 })?;
840
841 // Convert to our Receiver type
842 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
843
844 // Spawn task to forward messages
845 tokio::spawn(async move {
846 let mut sub = subscriber;
847 while let Some(msg) = sub.recv().await {
848 // Auto-acknowledge message before moving payload
849 if let Err(e) = msg.ack().await {
850 tracing::warn!(
851 topic = %msg.topic,
852 message_id = %msg.id,
853 error = %e,
854 "Failed to acknowledge message (will be redelivered on reconnect)"
855 );
856 }
857 // Extract payload and forward
858 if tx.send(msg.payload).is_err() {
859 break; // Receiver dropped
860 }
861 }
862 });
863
864 Ok(Receiver::from(rx))
865 }
866
867 /// Discover topics matching a glob pattern
868 ///
869 /// Queries Redis for all topics matching the given pattern without subscribing.
870 /// Useful for inspecting available topics before subscribing.
871 ///
872 /// # Pattern Syntax
873 ///
874 /// - `*` matches any sequence of characters within a path segment
875 /// - `**` matches any sequence of path segments
876 /// - `{a,b}` matches either a or b (future enhancement)
877 ///
878 /// # Arguments
879 ///
880 /// * `pattern` - Glob pattern to match topics against
881 ///
882 /// # Returns
883 ///
884 /// A vector of topic paths that match the pattern
885 ///
886 /// # Example
887 ///
888 /// ```rust,no_run
889 /// use mecha10::prelude::*;
890 ///
891 /// # async fn example(ctx: &Context) -> Result<()> {
892 /// // Discover all camera topics
893 /// let topics = ctx.discover_topics("/sensor/camera/*").await?;
894 /// println!("Found {} camera topics: {:?}", topics.len(), topics);
895 /// # Ok(())
896 /// # }
897 /// ```
898 #[cfg(feature = "messaging")]
899 pub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>> {
900 use crate::aggregated::pattern::convert_glob_to_redis;
901
902 let bus = self.message_bus.read().await;
903 let redis_pattern = convert_glob_to_redis(pattern);
904
905 bus.discover_topics(&redis_pattern)
906 .await
907 .with_context(|| {
908 format!(
909 "Failed to discover topics matching pattern '{}' (redis pattern: '{}') from node '{}'",
910 pattern, redis_pattern, self.node_id
911 )
912 })
913 .map_err(|e| Mecha10Error::MessagingError {
914 message: format!("{:#}", e),
915 suggestion: "Check that Redis is running and the pattern is valid".to_string(),
916 })
917 }
918
919 /// Subscribe to multiple topics matching a glob pattern
920 ///
921 /// Returns an aggregated receiver that multiplexes all matching topics into a single stream.
922 /// Each message is tagged with its source topic path.
923 ///
924 /// # Pattern Syntax
925 ///
926 /// - `*` matches any sequence of characters within a path segment
927 /// - `**` matches any sequence of path segments
928 /// - `{a,b}` matches either a or b (future enhancement)
929 ///
930 /// # Arguments
931 ///
932 /// * `pattern` - Glob pattern to match topics against
933 ///
934 /// # Returns
935 ///
936 /// An `AggregatedReceiver<T>` that yields `(topic_path, message)` tuples
937 ///
938 /// # Examples
939 ///
940 /// ```rust,no_run
941 /// use mecha10::prelude::*;
942 ///
943 /// # async fn example(ctx: &Context) -> Result<()> {
944 /// // Subscribe to all cameras with one call
945 /// let mut cameras = ctx.subscribe_aggregated::<ImageMessage>("/sensor/camera/*").await?;
946 ///
947 /// while let Some((topic, image)) = cameras.recv().await {
948 /// let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
949 /// println!("Camera {}: {}x{}", camera_name, image.width, image.height);
950 /// }
951 /// # Ok(())
952 /// # }
953 /// ```
954 #[cfg(feature = "messaging")]
955 pub async fn subscribe_aggregated<T>(&self, pattern: &str) -> Result<crate::aggregated::AggregatedReceiver<T>>
956 where
957 T: Message + DeserializeOwned + Send + 'static,
958 {
959 // 1. Discover all topics matching pattern
960 let matching_topics = self.discover_topics(pattern).await?;
961
962 if matching_topics.is_empty() {
963 return Err(Mecha10Error::MessagingError {
964 message: format!("No topics found matching pattern: {}", pattern),
965 suggestion: "Check that the pattern is correct and topics exist".to_string(),
966 });
967 }
968
969 // 2. Subscribe to each topic
970 let mut receivers = Vec::new();
971 for topic_path in matching_topics {
972 let rx = self.subscribe_raw::<T>(&topic_path).await?;
973 // Extract receiver and wrap in ReceiverType enum
974 let receiver_type = match rx.inner {
975 crate::context::receiver::ReceiverInner::Unbounded(unbounded) => {
976 crate::aggregated::ReceiverType::Unbounded(unbounded)
977 }
978 crate::context::receiver::ReceiverInner::Bounded(bounded) => {
979 crate::aggregated::ReceiverType::Bounded(bounded)
980 }
981 };
982 receivers.push((topic_path.clone(), receiver_type));
983 }
984
985 // 3. Return aggregated receiver with support for both channel types
986 Ok(crate::aggregated::AggregatedReceiver::new_mixed(
987 pattern.to_string(),
988 receivers,
989 ))
990 }
991
992 /// Publish a message to a topic using a raw string path (for RPC and advanced use cases)
993 ///
994 /// # Arguments
995 ///
996 /// * `topic_path` - Raw topic path string
997 /// * `message` - Message to publish
998 ///
999 /// # Note
1000 ///
1001 /// This is a low-level method. Prefer using `publish_to()` with type-safe topics
1002 /// for normal use cases. This method is primarily for RPC and dynamic topic handling.
1003 #[cfg(feature = "messaging")]
1004 pub async fn publish_raw<T>(&self, topic_path: &str, message: &T) -> Result<()>
1005 where
1006 T: Message + Serialize,
1007 {
1008 let mut bus = self.message_bus.write().await;
1009
1010 bus.publish(topic_path, message)
1011 .await
1012 .with_context(|| {
1013 format!(
1014 "Failed to publish to raw topic '{}' from node '{}'",
1015 topic_path, self.node_id
1016 )
1017 })
1018 .map_err(|e| Mecha10Error::MessagingError {
1019 message: format!("{:#}", e),
1020 suggestion: "Check that Redis is running and accepting connections".to_string(),
1021 })?;
1022
1023 Ok(())
1024 }
1025
1026 /// Check if shutdown has been requested
1027 pub async fn is_shutdown(&self) -> bool {
1028 *self.shutdown.read().await
1029 }
1030
1031 /// Request shutdown
1032 pub async fn shutdown(&self) {
1033 *self.shutdown.write().await = true;
1034
1035 // Stop Redis bridge if running
1036 #[cfg(feature = "messaging")]
1037 {
1038 let bridge_opt = self.redis_bridge.read().await;
1039 if let Some(bridge) = bridge_opt.as_ref() {
1040 if let Err(e) = bridge.stop().await {
1041 tracing::warn!("Failed to stop Redis bridge during shutdown: {}", e);
1042 }
1043 }
1044 }
1045 }
1046
1047 /// Wait for shutdown signal
1048 pub async fn wait_for_shutdown(&self) {
1049 loop {
1050 if self.is_shutdown().await {
1051 break;
1052 }
1053 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1054 }
1055 }
1056
1057 /// Enter a logging span with this node's context
1058 ///
1059 /// This returns a guard that, when entered, adds the node_id to all log messages.
1060 /// The span is automatically exited when the guard is dropped.
1061 ///
1062 /// # Example
1063 ///
1064 /// ```rust
1065 /// use mecha10::prelude::*;
1066 ///
1067 /// # async fn example(ctx: &Context) -> Result<()> {
1068 /// let _guard = ctx.logging_span().entered();
1069 /// info!("This message includes node_id automatically");
1070 /// debug!("So does this one");
1071 /// # Ok(())
1072 /// # }
1073 /// ```
1074 pub fn logging_span(&self) -> tracing::Span {
1075 tracing::info_span!("node", node_id = %self.node_id)
1076 }
1077
1078 /// Subscribe to all camera topics
1079 ///
1080 /// Convenience method for `subscribe_aggregated::<Image>("/sensor/camera/*")`
1081 ///
1082 /// # Example
1083 ///
1084 /// ```rust,no_run
1085 /// use mecha10::prelude::*;
1086 ///
1087 /// # async fn example(ctx: &Context) -> Result<()> {
1088 /// let mut cameras = ctx.subscribe_all_cameras().await?;
1089 /// while let Some((topic, image)) = cameras.recv().await {
1090 /// let camera_name = topic.rsplit('/').next().unwrap_or("unknown");
1091 /// println!("Camera {}: {}x{}", camera_name, image.width, image.height);
1092 /// }
1093 /// # Ok(())
1094 /// # }
1095 /// ```
1096 #[cfg(feature = "messaging")]
1097 pub async fn subscribe_all_cameras(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::Image>> {
1098 self.subscribe_aggregated("/sensor/camera/*").await
1099 }
1100
1101 /// Subscribe to all sensor topics
1102 ///
1103 /// Subscribes to all sensor data including cameras, LiDAR, IMU, odometry, etc.
1104 ///
1105 /// # Example
1106 ///
1107 /// ```rust,no_run
1108 /// use mecha10::prelude::*;
1109 ///
1110 /// # async fn example(ctx: &Context) -> Result<()> {
1111 /// // Subscribe to all sensors (returns untyped messages)
1112 /// let topics = ctx.discover_topics("/sensor/**").await?;
1113 /// println!("Found {} sensor topics", topics.len());
1114 /// # Ok(())
1115 /// # }
1116 /// ```
1117 #[cfg(feature = "messaging")]
1118 pub async fn discover_all_sensors(&self) -> Result<Vec<String>> {
1119 self.discover_topics("/sensor/**").await
1120 }
1121
1122 /// Subscribe to all LiDAR topics
1123 ///
1124 /// Convenience method for `subscribe_aggregated::<LaserScan>("/sensor/lidar/*")`
1125 #[cfg(feature = "messaging")]
1126 pub async fn subscribe_all_lidars(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::LaserScan>> {
1127 self.subscribe_aggregated("/sensor/lidar/*").await
1128 }
1129
1130 /// Subscribe to all IMU topics
1131 ///
1132 /// Convenience method for `subscribe_aggregated::<IMU>("/sensor/imu/*")`
1133 #[cfg(feature = "messaging")]
1134 pub async fn subscribe_all_imus(&self) -> Result<crate::aggregated::AggregatedReceiver<crate::types::IMU>> {
1135 self.subscribe_aggregated("/sensor/imu/*").await
1136 }
1137
1138 /// Subscribe to all motor topics
1139 ///
1140 /// Convenience method for `subscribe_aggregated::<MotorStatus>("/actuator/motor/*")`
1141 #[cfg(feature = "messaging")]
1142 pub async fn subscribe_all_motors(
1143 &self,
1144 ) -> Result<crate::aggregated::AggregatedReceiver<crate::actuator::MotorStatus>> {
1145 self.subscribe_aggregated("/actuator/motor/*").await
1146 }
1147
1148 /// Subscribe to a topic pattern with wildcard
1149 ///
1150 /// Generic helper for subscribing to multiple topics matching a pattern.
1151 ///
1152 /// # Example
1153 ///
1154 /// ```rust,no_run
1155 /// use mecha10::prelude::*;
1156 /// use mecha10::topics::sensor;
1157 ///
1158 /// # async fn example(ctx: &Context) -> Result<()> {
1159 /// // Subscribe to all instances of a specific topic type
1160 /// let mut cameras = ctx.subscribe_pattern(sensor::CAMERA_RGB, "*").await?;
1161 /// # Ok(())
1162 /// # }
1163 /// ```
1164 #[cfg(feature = "messaging")]
1165 pub async fn subscribe_pattern<T>(
1166 &self,
1167 topic: crate::topics::Topic<T>,
1168 pattern: &str,
1169 ) -> Result<crate::aggregated::AggregatedReceiver<T>>
1170 where
1171 T: Message + DeserializeOwned + Send + 'static,
1172 {
1173 let full_pattern = format!("{}/{}", topic.path(), pattern);
1174 self.subscribe_aggregated(&full_pattern).await
1175 }
1176
1177 /// Get or create a state manager for this node
1178 ///
1179 /// The state manager provides persistent storage for node state across restarts.
1180 /// By default, it uses filesystem storage in `./state/` directory.
1181 ///
1182 /// # Example
1183 ///
1184 /// ```rust,no_run
1185 /// use mecha10::prelude::*;
1186 /// use serde::{Deserialize, Serialize};
1187 ///
1188 /// #[derive(Debug, Clone, Serialize, Deserialize)]
1189 /// struct MyState {
1190 /// counter: u64,
1191 /// position: (f64, f64),
1192 /// }
1193 ///
1194 /// # async fn example(ctx: &Context) -> Result<()> {
1195 /// let state_mgr = ctx.state_manager().await?;
1196 ///
1197 /// // Load previous state
1198 /// let mut state = state_mgr
1199 /// .load::<MyState>("my_node")
1200 /// .await?
1201 /// .unwrap_or(MyState {
1202 /// counter: 0,
1203 /// position: (0.0, 0.0),
1204 /// });
1205 ///
1206 /// // Update state
1207 /// state.counter += 1;
1208 ///
1209 /// // Save state
1210 /// state_mgr.save("my_node", &state).await?;
1211 /// # Ok(())
1212 /// # }
1213 /// ```
1214 pub async fn state_manager(&self) -> Result<Arc<crate::state::ConcreteStateManager>> {
1215 let mut mgr = self.state_manager.write().await;
1216
1217 if let Some(existing) = mgr.as_ref() {
1218 return Ok(Arc::clone(existing));
1219 }
1220
1221 // Create default filesystem-based state manager
1222 let state_dir = std::env::var("MECHA10_STATE_DIR").unwrap_or_else(|_| "./state".to_string());
1223 let fs_mgr = FilesystemStateManager::new(&state_dir).await?;
1224 let arc_mgr = Arc::new(crate::state::ConcreteStateManager::Filesystem(fs_mgr));
1225
1226 *mgr = Some(Arc::clone(&arc_mgr));
1227
1228 Ok(arc_mgr)
1229 }
1230
1231 /// Set a custom state manager for this context
1232 ///
1233 /// This allows using alternative backends (Redis, PostgreSQL, etc.) or custom implementations.
1234 ///
1235 /// # Example
1236 ///
1237 /// ```rust,no_run
1238 /// use mecha10::prelude::*;
1239 /// use std::sync::Arc;
1240 ///
1241 /// # async fn example(ctx: &Context) -> Result<()> {
1242 /// // Use in-memory state manager for testing
1243 /// let mem_mgr = Arc::new(MemoryStateManager::new());
1244 /// ctx.set_state_manager(mem_mgr).await;
1245 /// # Ok(())
1246 /// # }
1247 /// ```
1248 pub async fn set_state_manager(&self, manager: Arc<crate::state::ConcreteStateManager>) {
1249 let mut mgr = self.state_manager.write().await;
1250 *mgr = Some(manager);
1251 }
1252
1253 // ========================================
1254 // Topic Discovery Helpers
1255 // ========================================
1256
1257 /// List all sensor topics currently active in the system
1258 ///
1259 /// Returns all topics under `/sensor/*` including cameras, LiDAR, IMU, GPS, etc.
1260 ///
1261 /// # Example
1262 ///
1263 /// ```rust,no_run
1264 /// use mecha10::prelude::*;
1265 ///
1266 /// # async fn example(ctx: &Context) -> Result<()> {
1267 /// let sensors = ctx.list_sensor_topics().await?;
1268 /// println!("Available sensors:");
1269 /// for topic in sensors {
1270 /// println!(" - {}", topic);
1271 /// }
1272 /// # Ok(())
1273 /// # }
1274 /// ```
1275 #[cfg(feature = "messaging")]
1276 pub async fn list_sensor_topics(&self) -> Result<Vec<String>> {
1277 self.discover_topics("/sensor/*").await
1278 }
1279
1280 /// List all actuator topics currently active in the system
1281 ///
1282 /// Returns all topics under `/actuator/*` including motors, servos, grippers, etc.
1283 ///
1284 /// # Example
1285 ///
1286 /// ```rust,no_run
1287 /// use mecha10::prelude::*;
1288 ///
1289 /// # async fn example(ctx: &Context) -> Result<()> {
1290 /// let actuators = ctx.list_actuator_topics().await?;
1291 /// println!("Available actuators:");
1292 /// for topic in actuators {
1293 /// println!(" - {}", topic);
1294 /// }
1295 /// # Ok(())
1296 /// # }
1297 /// ```
1298 #[cfg(feature = "messaging")]
1299 pub async fn list_actuator_topics(&self) -> Result<Vec<String>> {
1300 self.discover_topics("/actuator/*").await
1301 }
1302
1303 /// List all topics in the system
1304 ///
1305 /// Returns all active topics regardless of category.
1306 ///
1307 /// # Example
1308 ///
1309 /// ```rust,no_run
1310 /// use mecha10::prelude::*;
1311 ///
1312 /// # async fn example(ctx: &Context) -> Result<()> {
1313 /// let all_topics = ctx.list_all_topics().await?;
1314 /// println!("All active topics ({} total):", all_topics.len());
1315 /// for topic in all_topics {
1316 /// println!(" - {}", topic);
1317 /// }
1318 /// # Ok(())
1319 /// # }
1320 /// ```
1321 #[cfg(feature = "messaging")]
1322 pub async fn list_all_topics(&self) -> Result<Vec<String>> {
1323 self.discover_topics("*").await
1324 }
1325
1326 /// List all camera topics
1327 ///
1328 /// Returns all topics under `/sensor/camera/*`.
1329 ///
1330 /// # Example
1331 ///
1332 /// ```rust,no_run
1333 /// use mecha10::prelude::*;
1334 ///
1335 /// # async fn example(ctx: &Context) -> Result<()> {
1336 /// let cameras = ctx.list_camera_topics().await?;
1337 /// println!("Available cameras:");
1338 /// for topic in cameras {
1339 /// println!(" - {}", topic);
1340 /// }
1341 /// # Ok(())
1342 /// # }
1343 /// ```
1344 #[cfg(feature = "messaging")]
1345 pub async fn list_camera_topics(&self) -> Result<Vec<String>> {
1346 self.discover_topics("/sensor/camera/*").await
1347 }
1348
1349 /// List all LiDAR topics
1350 ///
1351 /// Returns all topics under `/sensor/lidar/*`.
1352 #[cfg(feature = "messaging")]
1353 pub async fn list_lidar_topics(&self) -> Result<Vec<String>> {
1354 self.discover_topics("/sensor/lidar/*").await
1355 }
1356
1357 /// List all IMU topics
1358 ///
1359 /// Returns all topics under `/sensor/imu/*`.
1360 #[cfg(feature = "messaging")]
1361 pub async fn list_imu_topics(&self) -> Result<Vec<String>> {
1362 self.discover_topics("/sensor/imu/*").await
1363 }
1364
1365 /// List all motor topics
1366 ///
1367 /// Returns all topics under `/actuator/motor/*`.
1368 #[cfg(feature = "messaging")]
1369 pub async fn list_motor_topics(&self) -> Result<Vec<String>> {
1370 self.discover_topics("/actuator/motor/*").await
1371 }
1372
1373 /// Print a formatted summary of all active topics
1374 ///
1375 /// Useful for debugging and introspection. Groups topics by category
1376 /// and displays them in a human-readable format.
1377 ///
1378 /// # Example
1379 ///
1380 /// ```rust,no_run
1381 /// use mecha10::prelude::*;
1382 ///
1383 /// # async fn example(ctx: &Context) -> Result<()> {
1384 /// ctx.print_topic_summary().await?;
1385 /// // Output:
1386 /// // === Active Topics ===
1387 /// // Sensors (3):
1388 /// // - /sensor/camera/front
1389 /// // - /sensor/lidar/main
1390 /// // - /sensor/imu/base
1391 /// // Actuators (2):
1392 /// // - /actuator/motor/left
1393 /// // - /actuator/motor/right
1394 /// // =====================
1395 /// # Ok(())
1396 /// # }
1397 /// ```
1398 #[cfg(feature = "messaging")]
1399 pub async fn print_topic_summary(&self) -> Result<()> {
1400 let sensors = self.list_sensor_topics().await.unwrap_or_default();
1401 let actuators = self.list_actuator_topics().await.unwrap_or_default();
1402 let all_topics = self.list_all_topics().await?;
1403
1404 let other_topics: Vec<String> = all_topics
1405 .into_iter()
1406 .filter(|t| !t.starts_with("/sensor/") && !t.starts_with("/actuator/"))
1407 .collect();
1408
1409 println!("=== Active Topics ===");
1410
1411 if !sensors.is_empty() {
1412 println!("Sensors ({}):", sensors.len());
1413 for topic in sensors {
1414 println!(" - {}", topic);
1415 }
1416 }
1417
1418 if !actuators.is_empty() {
1419 println!("Actuators ({}):", actuators.len());
1420 for topic in actuators {
1421 println!(" - {}", topic);
1422 }
1423 }
1424
1425 if !other_topics.is_empty() {
1426 println!("Other ({}):", other_topics.len());
1427 for topic in other_topics {
1428 println!(" - {}", topic);
1429 }
1430 }
1431
1432 println!("=====================");
1433
1434 Ok(())
1435 }
1436}