mecha10_messaging/lib.rs
1//! Mecha10 Messaging Layer
2//!
3//! This crate provides a Redis Streams-based pub/sub messaging system for
4//! inter-node communication in the Mecha10 framework.
5//!
6//! # Features
7//!
8//! - Type-safe message serialization/deserialization
9//! - Redis Streams backend for reliable message delivery
10//! - Topic-based routing
11//! - Consumer groups for load balancing
12//! - Message acknowledgment and retry
13//! - Automatic reconnection
14//!
15//! # Example
16//!
17//! ```rust
18//! use mecha10_messaging::{MessageBus, Message};
19//! use serde::{Deserialize, Serialize};
20//!
21//! #[derive(Debug, Serialize, Deserialize)]
22//! struct LaserScan {
23//! ranges: Vec<f32>,
24//! timestamp: u64,
25//! }
26//!
27//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
28//! let mut bus = MessageBus::connect("redis://localhost:6379", "robot-1").await?;
29//!
30//! // Subscribe to topic
31//! let mut rx = bus.subscribe::<LaserScan>("/scan", "processing").await?;
32//!
33//! // Publish message
34//! let scan = LaserScan { ranges: vec![1.0, 2.0, 3.0], timestamp: 12345 };
35//! bus.publish("/scan", &scan).await?;
36//!
37//! // Receive message
38//! if let Some(msg) = rx.recv().await {
39//! println!("Received scan: {:?}", msg.payload);
40//! msg.ack().await?;
41//! }
42//! # Ok(())
43//! # }
44//! ```
45
46use redis::aio::MultiplexedConnection;
47use redis::{AsyncCommands, Client, RedisError};
48use serde::{de::DeserializeOwned, Deserialize, Serialize};
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::time::Duration;
52use thiserror::Error;
53use tokio::sync::{mpsc, RwLock};
54
55/// Messaging errors
56#[derive(Debug, Error)]
57pub enum MessagingError {
58 #[error("Redis error: {0}")]
59 Redis(#[from] RedisError),
60
61 #[error("Serialization error: {0}")]
62 Serialization(#[from] serde_json::Error),
63
64 #[error("Connection error: {0}")]
65 Connection(String),
66
67 #[error("Subscription error: {0}")]
68 Subscription(String),
69
70 #[error("Channel closed")]
71 ChannelClosed,
72}
73
74/// Result type for messaging operations
75pub type Result<T> = std::result::Result<T, MessagingError>;
76
77/// Configuration for connection retry logic
78#[derive(Debug, Clone)]
79pub struct RetryConfig {
80 /// Maximum number of connection attempts
81 pub max_attempts: usize,
82 /// Initial delay between retries
83 pub initial_delay: Duration,
84 /// Maximum delay between retries
85 pub max_delay: Duration,
86 /// Multiplier for exponential backoff
87 pub multiplier: f64,
88}
89
90impl Default for RetryConfig {
91 fn default() -> Self {
92 Self {
93 max_attempts: 5,
94 initial_delay: Duration::from_millis(100),
95 max_delay: Duration::from_secs(10),
96 multiplier: 2.0,
97 }
98 }
99}
100
101/// A message envelope containing metadata and payload
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct Message<T> {
104 /// Message ID (Redis Stream ID)
105 pub id: String,
106
107 /// Topic the message was published to
108 pub topic: String,
109
110 /// Publisher node ID
111 pub publisher: String,
112
113 /// Timestamp (milliseconds since epoch)
114 pub timestamp: u64,
115
116 /// Message payload
117 pub payload: T,
118
119 /// Internal: Shared Redis connection for acknowledgment (avoids creating new connections)
120 #[serde(skip)]
121 ack_connection: Option<Arc<RwLock<MultiplexedConnection>>>,
122
123 /// Internal: consumer group for acknowledgment
124 #[serde(skip)]
125 consumer_group: Option<String>,
126}
127
128impl<T> Message<T> {
129 /// Acknowledge the message (mark as processed)
130 ///
131 /// Uses a shared connection to avoid creating new connections for each ack,
132 /// which significantly reduces overhead at high message rates.
133 pub async fn ack(&self) -> Result<()> {
134 if let (Some(conn), Some(group)) = (&self.ack_connection, &self.consumer_group) {
135 // Reuse shared connection for acknowledgment
136 let mut c = conn.write().await;
137 let _: () = c
138 .xack(&self.topic, group, &[&self.id])
139 .await
140 .map_err(MessagingError::Redis)?;
141 }
142 Ok(())
143 }
144
145 /// Negative acknowledgment (mark for retry)
146 pub async fn nack(&self) -> Result<()> {
147 // Redis Streams will automatically retry pending messages
148 // We just don't ACK it, and it will be redelivered
149 Ok(())
150 }
151}
152
153/// Subscriber handle for receiving messages
154pub struct Subscriber<T> {
155 rx: mpsc::UnboundedReceiver<Message<T>>,
156 topic: String,
157}
158
159impl<T> Subscriber<T> {
160 /// Receive the next message
161 pub async fn recv(&mut self) -> Option<Message<T>> {
162 self.rx.recv().await
163 }
164
165 /// Get the topic this subscriber is listening to
166 pub fn topic(&self) -> &str {
167 &self.topic
168 }
169}
170
171/// Message bus for pub/sub communication
172pub struct MessageBus {
173 connection: Arc<RwLock<MultiplexedConnection>>,
174 node_id: String,
175 namespace: String,
176 redis_url: String,
177 subscriptions: Arc<RwLock<HashMap<String, tokio::task::JoinHandle<()>>>>,
178}
179
180impl MessageBus {
181 /// Connect to Redis and create a new message bus
182 ///
183 /// Uses exponential backoff retry logic for resilient connection establishment.
184 ///
185 /// # Arguments
186 ///
187 /// * `redis_url` - Redis connection URL (e.g., "redis://localhost:6379")
188 /// * `node_id` - Unique identifier for this node
189 ///
190 /// # Example
191 ///
192 /// ```rust
193 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
194 /// let bus = MessageBus::connect("redis://localhost:6379", "camera-node").await?;
195 /// # Ok(())
196 /// # }
197 /// ```
198 pub async fn connect(redis_url: &str, node_id: &str) -> Result<Self> {
199 Self::connect_with_retry(redis_url, node_id, RetryConfig::default()).await
200 }
201
202 /// Connect to Redis with custom retry configuration
203 ///
204 /// # Arguments
205 ///
206 /// * `redis_url` - Redis connection URL
207 /// * `node_id` - Unique identifier for this node
208 /// * `retry_config` - Retry configuration for exponential backoff
209 ///
210 /// # Example
211 ///
212 /// ```rust
213 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
214 /// let retry_config = RetryConfig {
215 /// max_attempts: 10,
216 /// initial_delay: Duration::from_millis(200),
217 /// max_delay: Duration::from_secs(30),
218 /// multiplier: 2.0,
219 /// };
220 /// let bus = MessageBus::connect_with_retry(
221 /// "redis://localhost:6379",
222 /// "camera-node",
223 /// retry_config
224 /// ).await?;
225 /// # Ok(())
226 /// # }
227 /// ```
228 pub async fn connect_with_retry(redis_url: &str, node_id: &str, retry_config: RetryConfig) -> Result<Self> {
229 let client = Client::open(redis_url).map_err(MessagingError::Redis)?;
230
231 let mut attempt = 0;
232 let mut delay = retry_config.initial_delay;
233
234 loop {
235 attempt += 1;
236
237 match client.get_multiplexed_async_connection().await {
238 Ok(connection) => {
239 if attempt > 1 {
240 tracing::info!(
241 node_id = node_id,
242 attempts = attempt,
243 "Successfully connected to Redis after retries"
244 );
245 }
246
247 return Ok(Self {
248 connection: Arc::new(RwLock::new(connection)),
249 node_id: node_id.to_string(),
250 namespace: "mecha10".to_string(),
251 redis_url: redis_url.to_string(),
252 subscriptions: Arc::new(RwLock::new(HashMap::new())),
253 });
254 }
255 Err(e) => {
256 if attempt >= retry_config.max_attempts {
257 tracing::error!(
258 node_id = node_id,
259 attempts = attempt,
260 error = %e,
261 "Failed to connect to Redis after maximum retry attempts"
262 );
263 return Err(MessagingError::Connection(format!(
264 "Failed to connect to Redis after {} attempts: {}",
265 attempt, e
266 )));
267 }
268
269 tracing::warn!(
270 node_id = node_id,
271 attempt = attempt,
272 max_attempts = retry_config.max_attempts,
273 delay_ms = delay.as_millis(),
274 error = %e,
275 "Redis connection failed, retrying"
276 );
277
278 tokio::time::sleep(delay).await;
279
280 // Calculate next delay with exponential backoff
281 delay = std::cmp::min(
282 Duration::from_secs_f64(delay.as_secs_f64() * retry_config.multiplier),
283 retry_config.max_delay,
284 );
285 }
286 }
287 }
288 }
289
290 /// Set the namespace for topic isolation
291 ///
292 /// Namespaces allow multiple robots/fleets to use the same Redis instance
293 /// without topic collisions.
294 pub fn set_namespace(&mut self, namespace: &str) {
295 self.namespace = namespace.to_string();
296 }
297
298 /// Publish a message to a topic
299 ///
300 /// # Arguments
301 ///
302 /// * `topic` - Topic name (e.g., "/scan", "/odom")
303 /// * `payload` - Message payload (must be serializable)
304 ///
305 /// # Example
306 ///
307 /// ```rust
308 /// # use serde::{Serialize, Deserialize};
309 /// # #[derive(Serialize, Deserialize)]
310 /// # struct Point { x: f32, y: f32 }
311 /// # async fn example(bus: &mut MessageBus) -> Result<(), Box<dyn std::error::Error>> {
312 /// bus.publish("/position", &Point { x: 1.0, y: 2.0 }).await?;
313 /// # Ok(())
314 /// # }
315 /// ```
316 pub async fn publish<T: Serialize>(&mut self, topic: &str, payload: &T) -> Result<()> {
317 let full_topic = format!("{}:{}", self.namespace, topic);
318
319 let envelope = serde_json::json!({
320 "publisher": self.node_id,
321 "timestamp": std::time::SystemTime::now()
322 .duration_since(std::time::UNIX_EPOCH)
323 .unwrap()
324 .as_millis() as u64,
325 "payload": payload,
326 });
327
328 let data = serde_json::to_string(&envelope)?;
329
330 let mut conn = self.connection.write().await;
331
332 // Use XADD with MAXLEN to keep approximately the last N messages in the stream
333 // The ~ makes it approximate/efficient (Redis can trim at any point)
334 // This prevents Redis from evicting empty streams and ensures stream persistence
335 //
336 // For camera/sensor streams that need low latency, calculate buffer based on target latency
337 // Target: 200ms buffer to balance latency vs frame loss
338 //
339 // Buffer calculation based on camera FPS (configurable via MECHA10_CAMERA_FPS env var):
340 // 15 FPS → 3 frames (200ms)
341 // 20 FPS → 4 frames (200ms)
342 // 30 FPS → 6 frames (200ms)
343 // 60 FPS → 12 frames (200ms)
344 let maxlen = if topic.contains("/camera/") || topic.contains("/video/") || topic.contains("/image/") {
345 // Read camera FPS from environment (default: 30)
346 let fps = std::env::var("MECHA10_CAMERA_FPS")
347 .ok()
348 .and_then(|s| s.parse::<f32>().ok())
349 .unwrap_or(30.0);
350
351 // Calculate buffer size for ~200ms latency
352 let target_latency_ms = 200.0;
353 let buffer_frames = ((fps * target_latency_ms) / 1000.0).ceil() as usize;
354 buffer_frames.max(2) // Minimum 2 frames to prevent underflow
355 } else {
356 100 // Default: keep last 100 messages for other topics
357 };
358
359 let _: () = conn
360 .xadd_maxlen(
361 &full_topic,
362 redis::streams::StreamMaxlen::Approx(maxlen),
363 "*",
364 &[("data", data)],
365 )
366 .await
367 .map_err(MessagingError::Redis)?;
368
369 Ok(())
370 }
371
372 /// Subscribe to a topic with a consumer group
373 ///
374 /// Consumer groups allow multiple nodes to process messages in parallel,
375 /// with each message delivered to only one consumer in the group.
376 ///
377 /// # Arguments
378 ///
379 /// * `topic` - Topic name (e.g., "/scan", "/odom")
380 /// * `consumer_group` - Consumer group name (e.g., "processing", "logging")
381 ///
382 /// # Example
383 ///
384 /// ```rust
385 /// # use serde::{Serialize, Deserialize};
386 /// # #[derive(Serialize, Deserialize)]
387 /// # struct LaserScan { ranges: Vec<f32> }
388 /// # async fn example(bus: &mut MessageBus) -> Result<(), Box<dyn std::error::Error>> {
389 /// let mut rx = bus.subscribe::<LaserScan>("/scan", "slam").await?;
390 /// while let Some(msg) = rx.recv().await {
391 /// println!("Received: {:?}", msg.payload);
392 /// msg.ack().await?;
393 /// }
394 /// # Ok(())
395 /// # }
396 /// ```
397 pub async fn subscribe<T: DeserializeOwned + Send + 'static>(
398 &mut self,
399 topic: &str,
400 consumer_group: &str,
401 ) -> Result<Subscriber<T>> {
402 let full_topic = format!("{}:{}", self.namespace, topic);
403 let consumer_id = format!("{}:{}", self.node_id, uuid::Uuid::new_v4());
404
405 // Create consumer group with retry logic (exponential backoff)
406 let max_retries = 5;
407 let mut retry_count = 0;
408 let mut group_created = false;
409
410 while retry_count < max_retries {
411 let mut conn = self.connection.write().await;
412 let result: std::result::Result<(), RedisError> =
413 conn.xgroup_create_mkstream(&full_topic, consumer_group, "$").await;
414 drop(conn);
415
416 match result {
417 Ok(_) => {
418 tracing::debug!("Created consumer group '{}' for topic '{}'", consumer_group, full_topic);
419 group_created = true;
420 break;
421 }
422 Err(e) => {
423 let err_msg = e.to_string();
424 // Group already exists is OK
425 if err_msg.contains("BUSYGROUP") {
426 tracing::debug!(
427 "Consumer group '{}' already exists for topic '{}'",
428 consumer_group,
429 full_topic
430 );
431 group_created = true;
432 break;
433 }
434
435 retry_count += 1;
436 if retry_count < max_retries {
437 let delay = std::time::Duration::from_millis(100 * 2_u64.pow(retry_count - 1));
438 tracing::warn!(
439 "⚠️ Failed to create consumer group '{}' for topic '{}' (attempt {}/{}): {:?}. Retrying in {:?}...",
440 consumer_group, full_topic, retry_count, max_retries, e, delay
441 );
442 tokio::time::sleep(delay).await;
443 } else {
444 tracing::error!(
445 "❌ Failed to create consumer group '{}' for topic '{}' after {} attempts: {:?}",
446 consumer_group,
447 full_topic,
448 max_retries,
449 e
450 );
451 return Err(MessagingError::Subscription(format!(
452 "Failed to create consumer group after {} retries: {}",
453 max_retries, e
454 )));
455 }
456 }
457 }
458 }
459
460 if !group_created {
461 return Err(MessagingError::Subscription(format!(
462 "Failed to verify consumer group creation for topic '{}'",
463 full_topic
464 )));
465 }
466
467 // Create channel for messages
468 let (tx, rx) = mpsc::unbounded_channel();
469
470 // Spawn background task to read from stream
471 let redis_url = self.redis_url.clone();
472 let full_topic_clone = full_topic.clone();
473 let consumer_group_clone = consumer_group.to_string();
474 let consumer_id_clone = consumer_id.clone();
475
476 let task = tokio::spawn(async move {
477 tracing::debug!(
478 "Background subscription task started for topic '{}' group '{}'",
479 full_topic_clone,
480 consumer_group_clone
481 );
482
483 // Track consecutive connection failures for client recreation
484 let mut consecutive_failures: u32 = 0;
485 const MAX_FAILURES_BEFORE_RECREATE: u32 = 5;
486
487 // Current client - will be recreated if connection driver terminates
488 let mut current_client = match Client::open(redis_url.as_str()) {
489 Ok(c) => c,
490 Err(e) => {
491 tracing::error!(
492 "❌ Failed to create Redis client for topic '{}': {:?}",
493 full_topic_clone,
494 e
495 );
496 return;
497 }
498 };
499
500 // Create a dedicated shared connection for acknowledgments
501 // This avoids creating new connections for each message ack
502 let mut ack_connection: Arc<RwLock<MultiplexedConnection>> =
503 match current_client.get_multiplexed_async_connection().await {
504 Ok(conn) => Arc::new(RwLock::new(conn)),
505 Err(e) => {
506 tracing::error!(
507 "❌ Failed to create ack connection for topic '{}': {:?}",
508 full_topic_clone,
509 e
510 );
511 return;
512 }
513 };
514
515 loop {
516 match current_client.get_multiplexed_async_connection().await {
517 Ok(mut conn) => {
518 // Reset failure counter on successful connection
519 consecutive_failures = 0;
520 // Read messages from stream
521 let result: std::result::Result<redis::streams::StreamReadReply, RedisError> = conn
522 .xread_options(
523 &[&full_topic_clone],
524 &[">"],
525 &redis::streams::StreamReadOptions::default()
526 .group(&consumer_group_clone, &consumer_id_clone)
527 .count(10)
528 .block(1000),
529 )
530 .await;
531
532 match result {
533 Ok(reply) => {
534 tracing::debug!(
535 "📨 Received {} keys from stream '{}'",
536 reply.keys.len(),
537 full_topic_clone
538 );
539 for stream_key in reply.keys {
540 for stream_id in stream_key.ids {
541 // Parse message
542 match stream_id.map.get("data") {
543 Some(redis::Value::BulkString(bytes)) => {
544 match std::str::from_utf8(bytes) {
545 Ok(json_str) => {
546 match serde_json::from_str::<serde_json::Value>(json_str) {
547 Ok(envelope) => {
548 match serde_json::from_value::<T>(
549 envelope["payload"].clone(),
550 ) {
551 Ok(payload) => {
552 let msg = Message {
553 id: stream_id.id.clone(),
554 topic: full_topic_clone.clone(),
555 publisher: envelope["publisher"]
556 .as_str()
557 .unwrap_or("")
558 .to_string(),
559 timestamp: envelope["timestamp"]
560 .as_u64()
561 .unwrap_or(0),
562 payload,
563 ack_connection: Some(
564 ack_connection.clone(),
565 ),
566 consumer_group: Some(
567 consumer_group_clone.clone(),
568 ),
569 };
570
571 if tx.send(msg).is_err() {
572 tracing::warn!("Channel closed, subscriber dropped for topic '{}'", full_topic_clone);
573 return;
574 // Subscriber dropped
575 }
576 }
577 Err(e) => {
578 tracing::warn!(
579 "Failed to deserialize payload for topic '{}' msg '{}': {:?}",
580 full_topic_clone, stream_id.id, e
581 );
582 }
583 }
584 }
585 Err(e) => {
586 tracing::warn!(
587 "Failed to parse JSON envelope for topic '{}' msg '{}': {:?}",
588 full_topic_clone, stream_id.id, e
589 );
590 }
591 }
592 }
593 Err(e) => {
594 tracing::warn!(
595 "Failed to decode UTF-8 for topic '{}' msg '{}': {:?}",
596 full_topic_clone,
597 stream_id.id,
598 e
599 );
600 }
601 }
602 }
603 Some(other) => {
604 tracing::warn!(
605 "Unexpected Redis value type for topic '{}' msg '{}': {:?}",
606 full_topic_clone,
607 stream_id.id,
608 other
609 );
610 }
611 None => {
612 tracing::debug!(
613 "Missing 'data' field for topic '{}' msg '{}'",
614 full_topic_clone,
615 stream_id.id
616 );
617 }
618 }
619 }
620 }
621 }
622 Err(e) => {
623 let err_msg = e.to_string();
624
625 // If consumer group doesn't exist, try to recreate it (this is normal for new topics)
626 if err_msg.contains("NOGROUP") {
627 tracing::debug!(
628 "Consumer group '{}' missing for topic '{}', recreating...",
629 consumer_group_clone,
630 full_topic_clone
631 );
632 let result: std::result::Result<(), RedisError> = conn
633 .xgroup_create_mkstream(&full_topic_clone, &consumer_group_clone, "$")
634 .await;
635 if let Err(create_err) = result {
636 tracing::debug!(
637 "Failed to recreate consumer group '{}': {:?}",
638 consumer_group_clone,
639 create_err
640 );
641 }
642 } else {
643 tracing::debug!(
644 "XREAD error for topic '{}' group '{}': {:?}",
645 full_topic_clone,
646 consumer_group_clone,
647 e
648 );
649 }
650
651 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
652 }
653 }
654 }
655 Err(e) => {
656 consecutive_failures += 1;
657 tracing::debug!(
658 "Failed to get Redis connection for topic '{}' (attempt {}): {:?}",
659 full_topic_clone,
660 consecutive_failures,
661 e
662 );
663
664 // If we've failed too many times, recreate the client
665 if consecutive_failures >= MAX_FAILURES_BEFORE_RECREATE {
666 tracing::debug!(
667 "Recreating Redis client for topic '{}' after {} consecutive failures",
668 full_topic_clone,
669 consecutive_failures
670 );
671
672 match Client::open(redis_url.as_str()) {
673 Ok(new_client) => {
674 current_client = new_client;
675 consecutive_failures = 0;
676
677 // Also recreate the ack connection
678 match current_client.get_multiplexed_async_connection().await {
679 Ok(conn) => {
680 ack_connection = Arc::new(RwLock::new(conn));
681 tracing::debug!(
682 "Successfully recreated Redis client for topic '{}'",
683 full_topic_clone
684 );
685 }
686 Err(e) => {
687 tracing::debug!(
688 "Failed to create new ack connection for topic '{}': {:?}",
689 full_topic_clone,
690 e
691 );
692 }
693 }
694 }
695 Err(e) => {
696 tracing::debug!(
697 "Failed to recreate Redis client for topic '{}': {:?}",
698 full_topic_clone,
699 e
700 );
701 }
702 }
703 }
704
705 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
706 }
707 }
708 }
709 });
710
711 // Store task handle
712 let mut subs = self.subscriptions.write().await;
713 subs.insert(full_topic, task);
714
715 Ok(Subscriber {
716 rx,
717 topic: topic.to_string(),
718 })
719 }
720
721 /// Subscribe to multiple topics matching a wildcard pattern
722 ///
723 /// Enables aggregated subscriptions across multiple topics using wildcard patterns.
724 /// Useful for collecting data from multiple robots (e.g., `*/camera/rgb` subscribes
725 /// to camera feeds from all robots).
726 ///
727 /// # Arguments
728 ///
729 /// * `pattern` - Wildcard pattern with `*` as placeholder (e.g., "*/camera/rgb", "robot-*/scan")
730 /// * `consumer_group` - Consumer group name for load balancing
731 ///
732 /// # Pattern Syntax
733 ///
734 /// - `*` matches any segment in the topic path
735 /// - `*/camera/rgb` matches "robot-1/camera/rgb", "robot-2/camera/rgb", etc.
736 /// - `robot-*/scan` matches "robot-1/scan", "robot-alpha/scan", etc.
737 ///
738 /// # Example
739 ///
740 /// ```rust,no_run
741 /// # use serde::{Serialize, Deserialize};
742 /// # #[derive(Serialize, Deserialize)]
743 /// # struct Frame { data: Vec<u8> }
744 /// # async fn example(bus: &mut MessageBus) -> Result<(), Box<dyn std::error::Error>> {
745 /// // Subscribe to camera feeds from ALL robots
746 /// let mut rx = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-processing").await?;
747 ///
748 /// while let Some(msg) = rx.recv().await {
749 /// println!("Received frame from robot: {}", msg.topic);
750 /// // Process frame from any robot
751 /// msg.ack().await?;
752 /// }
753 /// # Ok(())
754 /// # }
755 /// ```
756 ///
757 /// # Implementation
758 ///
759 /// This method:
760 /// 1. Converts wildcard pattern to Redis KEYS pattern
761 /// 2. Discovers all matching topics using `discover_topics()`
762 /// 3. Subscribes to each discovered topic with the same consumer group
763 /// 4. Multiplexes messages from all topics into a single receiver
764 ///
765 /// # Note
766 ///
767 /// This performs initial topic discovery using Redis KEYS command. For dynamic
768 /// topic discovery (new topics appearing after subscription), consider polling
769 /// `discover_topics()` periodically and creating new subscriptions.
770 pub async fn subscribe_pattern<T: DeserializeOwned + Send + 'static>(
771 &mut self,
772 pattern: &str,
773 consumer_group: &str,
774 ) -> Result<Subscriber<T>> {
775 // Convert wildcard pattern to Redis KEYS pattern
776 // Example: "*/camera/rgb" -> "mecha10:*/camera/rgb"
777 let full_pattern = format!("{}:{}", self.namespace, pattern);
778
779 // Discover all topics matching the pattern
780 let topics = self.discover_topics(&full_pattern).await?;
781
782 if topics.is_empty() {
783 tracing::warn!(
784 pattern = pattern,
785 "No topics found matching pattern, subscriber will not receive messages until topics are created"
786 );
787 }
788
789 // Create a single channel for all matching topics
790 let (tx, rx) = mpsc::unbounded_channel();
791
792 // Subscribe to each discovered topic
793 for topic in topics {
794 let full_topic = format!("{}:{}", self.namespace, topic);
795 let consumer_id = format!("{}:{}", self.node_id, uuid::Uuid::new_v4());
796
797 // Create consumer group if it doesn't exist
798 let mut conn = self.connection.write().await;
799 let _: std::result::Result<(), RedisError> =
800 conn.xgroup_create_mkstream(&full_topic, consumer_group, "$").await;
801 drop(conn);
802
803 // Spawn background task to read from this stream
804 let redis_url = self.redis_url.clone();
805 let full_topic_clone = full_topic.clone();
806 let consumer_group_clone = consumer_group.to_string();
807 let consumer_id_clone = consumer_id.clone();
808 let tx_clone = tx.clone();
809 let topic_clone = topic.clone();
810
811 let task = tokio::spawn(async move {
812 // Track consecutive connection failures for client recreation
813 let mut consecutive_failures: u32 = 0;
814 const MAX_FAILURES_BEFORE_RECREATE: u32 = 5;
815
816 // Current client - will be recreated if connection driver terminates
817 let mut current_client = match Client::open(redis_url.as_str()) {
818 Ok(c) => c,
819 Err(e) => {
820 tracing::error!(
821 "❌ Failed to create Redis client for topic '{}': {:?}",
822 full_topic_clone,
823 e
824 );
825 return;
826 }
827 };
828
829 // Create a dedicated shared connection for acknowledgments
830 let mut ack_connection: Arc<RwLock<MultiplexedConnection>> =
831 match current_client.get_multiplexed_async_connection().await {
832 Ok(conn) => Arc::new(RwLock::new(conn)),
833 Err(e) => {
834 tracing::error!(
835 "❌ Failed to create ack connection for topic '{}': {:?}",
836 full_topic_clone,
837 e
838 );
839 return;
840 }
841 };
842
843 loop {
844 match current_client.get_multiplexed_async_connection().await {
845 Ok(mut conn) => {
846 // Reset failure counter on successful connection
847 consecutive_failures = 0;
848 // Read messages from stream
849 let result: std::result::Result<redis::streams::StreamReadReply, RedisError> = conn
850 .xread_options(
851 &[&full_topic_clone],
852 &[">"],
853 &redis::streams::StreamReadOptions::default()
854 .group(&consumer_group_clone, &consumer_id_clone)
855 .count(10)
856 .block(1000),
857 )
858 .await;
859
860 match result {
861 Ok(reply) => {
862 for stream_key in reply.keys {
863 for stream_id in stream_key.ids {
864 // Parse message
865 if let Some(redis::Value::BulkString(bytes)) = stream_id.map.get("data") {
866 if let Ok(json_str) = std::str::from_utf8(bytes) {
867 if let Ok(envelope) =
868 serde_json::from_str::<serde_json::Value>(json_str)
869 {
870 if let Ok(payload) =
871 serde_json::from_value::<T>(envelope["payload"].clone())
872 {
873 let msg = Message {
874 id: stream_id.id.clone(),
875 topic: topic_clone.clone(),
876 publisher: envelope["publisher"]
877 .as_str()
878 .unwrap_or("")
879 .to_string(),
880 timestamp: envelope["timestamp"].as_u64().unwrap_or(0),
881 payload,
882 ack_connection: Some(ack_connection.clone()),
883 consumer_group: Some(consumer_group_clone.clone()),
884 };
885
886 if tx_clone.send(msg).is_err() {
887 return; // Subscriber dropped
888 }
889 }
890 }
891 }
892 }
893 }
894 }
895 }
896 Err(_) => {
897 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
898 }
899 }
900 }
901 Err(e) => {
902 consecutive_failures += 1;
903 tracing::warn!(
904 "❌ Failed to get Redis connection for topic '{}' (attempt {}): {:?}",
905 full_topic_clone,
906 consecutive_failures,
907 e
908 );
909
910 // If we've failed too many times, recreate the client
911 if consecutive_failures >= MAX_FAILURES_BEFORE_RECREATE {
912 tracing::warn!(
913 "🔄 Recreating Redis client for topic '{}' after {} consecutive failures",
914 full_topic_clone,
915 consecutive_failures
916 );
917
918 match Client::open(redis_url.as_str()) {
919 Ok(new_client) => {
920 current_client = new_client;
921 consecutive_failures = 0;
922
923 // Also recreate the ack connection
924 match current_client.get_multiplexed_async_connection().await {
925 Ok(conn) => {
926 ack_connection = Arc::new(RwLock::new(conn));
927 tracing::info!(
928 "✅ Successfully recreated Redis client and ack connection for topic '{}'",
929 full_topic_clone
930 );
931 }
932 Err(e) => {
933 tracing::error!(
934 "❌ Failed to create new ack connection for topic '{}': {:?}",
935 full_topic_clone,
936 e
937 );
938 }
939 }
940 }
941 Err(e) => {
942 tracing::error!(
943 "❌ Failed to recreate Redis client for topic '{}': {:?}",
944 full_topic_clone,
945 e
946 );
947 }
948 }
949 }
950
951 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
952 }
953 }
954 }
955 });
956
957 // Store task handle
958 let mut subs = self.subscriptions.write().await;
959 subs.insert(format!("pattern:{}:{}", pattern, full_topic), task);
960 }
961
962 Ok(Subscriber {
963 rx,
964 topic: pattern.to_string(),
965 })
966 }
967
968 /// Discover topics matching a Redis KEYS pattern
969 ///
970 /// Uses Redis KEYS command to find all streams matching the given pattern.
971 /// This is useful for aggregated topic subscriptions.
972 ///
973 /// # Arguments
974 ///
975 /// * `pattern` - Redis KEYS pattern (e.g., "mecha10:topic:/sensor/camera/*")
976 ///
977 /// # Returns
978 ///
979 /// A vector of topic paths (without the namespace prefix)
980 ///
981 /// # Example
982 ///
983 /// ```rust,no_run
984 /// # async fn example(bus: &mut MessageBus) -> Result<(), Box<dyn std::error::Error>> {
985 /// // Discover all camera topics
986 /// let topics = bus.discover_topics("mecha10:topic:/sensor/camera/*").await?;
987 /// println!("Found {} camera topics", topics.len());
988 /// # Ok(())
989 /// # }
990 /// ```
991 ///
992 /// # Note
993 ///
994 /// This uses Redis KEYS command which can be slow on large datasets.
995 /// For production use with many topics, consider using SCAN instead.
996 pub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>> {
997 use redis::AsyncCommands;
998
999 let mut conn = self.connection.write().await;
1000
1001 // Use KEYS command to find matching topics
1002 let keys: Vec<String> = conn.keys(pattern).await.map_err(MessagingError::Redis)?;
1003
1004 // Strip namespace prefix from each key
1005 let prefix = format!("{}:", self.namespace);
1006 let topics: Vec<String> = keys
1007 .into_iter()
1008 .filter_map(|key| key.strip_prefix(&prefix).map(|s| s.to_string()))
1009 .collect();
1010
1011 Ok(topics)
1012 }
1013
1014 /// Unsubscribe from all topics and close connections
1015 pub async fn close(&mut self) -> Result<()> {
1016 let mut subs = self.subscriptions.write().await;
1017 for (_, task) in subs.drain() {
1018 task.abort();
1019 }
1020 Ok(())
1021 }
1022}