Skip to main content

mecha10_messaging/
lib.rs

1//! Mecha10 Messaging Layer
2//!
3//! This crate provides a Redis Streams-based pub/sub messaging system for
4//! inter-node communication in the Mecha10 framework.
5//!
6//! # Features
7//!
8//! - Type-safe message serialization/deserialization
9//! - Redis Streams backend for reliable message delivery
10//! - Topic-based routing
11//! - Consumer groups for load balancing
12//! - Message acknowledgment and retry
13//! - Automatic reconnection
14//!
15//! # Example
16//!
17//! ```rust
18//! use mecha10_messaging::{MessageBus, Message};
19//! use serde::{Deserialize, Serialize};
20//!
21//! #[derive(Debug, Serialize, Deserialize)]
22//! struct LaserScan {
23//!     ranges: Vec<f32>,
24//!     timestamp: u64,
25//! }
26//!
27//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
28//! let mut bus = MessageBus::connect("redis://localhost:6379", "robot-1").await?;
29//!
30//! // Subscribe to topic
31//! let mut rx = bus.subscribe::<LaserScan>("/scan", "processing").await?;
32//!
33//! // Publish message
34//! let scan = LaserScan { ranges: vec![1.0, 2.0, 3.0], timestamp: 12345 };
35//! bus.publish("/scan", &scan).await?;
36//!
37//! // Receive message
38//! if let Some(msg) = rx.recv().await {
39//!     println!("Received scan: {:?}", msg.payload);
40//!     msg.ack().await?;
41//! }
42//! # Ok(())
43//! # }
44//! ```
45
46use redis::aio::MultiplexedConnection;
47use redis::{AsyncCommands, Client, RedisError};
48use serde::{de::DeserializeOwned, Deserialize, Serialize};
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::time::Duration;
52use thiserror::Error;
53use tokio::sync::{mpsc, RwLock};
54
55/// Messaging errors
56#[derive(Debug, Error)]
57pub enum MessagingError {
58    #[error("Redis error: {0}")]
59    Redis(#[from] RedisError),
60
61    #[error("Serialization error: {0}")]
62    Serialization(#[from] serde_json::Error),
63
64    #[error("Connection error: {0}")]
65    Connection(String),
66
67    #[error("Subscription error: {0}")]
68    Subscription(String),
69
70    #[error("Channel closed")]
71    ChannelClosed,
72}
73
74/// Result type for messaging operations
75pub type Result<T> = std::result::Result<T, MessagingError>;
76
77/// Configuration for connection retry logic
78#[derive(Debug, Clone)]
79pub struct RetryConfig {
80    /// Maximum number of connection attempts
81    pub max_attempts: usize,
82    /// Initial delay between retries
83    pub initial_delay: Duration,
84    /// Maximum delay between retries
85    pub max_delay: Duration,
86    /// Multiplier for exponential backoff
87    pub multiplier: f64,
88}
89
90impl Default for RetryConfig {
91    fn default() -> Self {
92        Self {
93            max_attempts: 5,
94            initial_delay: Duration::from_millis(100),
95            max_delay: Duration::from_secs(10),
96            multiplier: 2.0,
97        }
98    }
99}
100
101/// A message envelope containing metadata and payload
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct Message<T> {
104    /// Message ID (Redis Stream ID)
105    pub id: String,
106
107    /// Topic the message was published to
108    pub topic: String,
109
110    /// Publisher node ID
111    pub publisher: String,
112
113    /// Timestamp (milliseconds since epoch)
114    pub timestamp: u64,
115
116    /// Message payload
117    pub payload: T,
118
119    /// Internal: Shared Redis connection for acknowledgment (avoids creating new connections)
120    #[serde(skip)]
121    ack_connection: Option<Arc<RwLock<MultiplexedConnection>>>,
122
123    /// Internal: consumer group for acknowledgment
124    #[serde(skip)]
125    consumer_group: Option<String>,
126}
127
128impl<T> Message<T> {
129    /// Acknowledge the message (mark as processed)
130    ///
131    /// Uses a shared connection to avoid creating new connections for each ack,
132    /// which significantly reduces overhead at high message rates.
133    pub async fn ack(&self) -> Result<()> {
134        if let (Some(conn), Some(group)) = (&self.ack_connection, &self.consumer_group) {
135            // Reuse shared connection for acknowledgment
136            let mut c = conn.write().await;
137            let _: () = c
138                .xack(&self.topic, group, &[&self.id])
139                .await
140                .map_err(MessagingError::Redis)?;
141        }
142        Ok(())
143    }
144
145    /// Negative acknowledgment (mark for retry)
146    pub async fn nack(&self) -> Result<()> {
147        // Redis Streams will automatically retry pending messages
148        // We just don't ACK it, and it will be redelivered
149        Ok(())
150    }
151}
152
153/// Subscriber handle for receiving messages
154pub struct Subscriber<T> {
155    rx: mpsc::UnboundedReceiver<Message<T>>,
156    topic: String,
157}
158
159impl<T> Subscriber<T> {
160    /// Receive the next message
161    pub async fn recv(&mut self) -> Option<Message<T>> {
162        self.rx.recv().await
163    }
164
165    /// Get the topic this subscriber is listening to
166    pub fn topic(&self) -> &str {
167        &self.topic
168    }
169}
170
171/// Message bus for pub/sub communication
172pub struct MessageBus {
173    connection: Arc<RwLock<MultiplexedConnection>>,
174    node_id: String,
175    namespace: String,
176    redis_url: String,
177    subscriptions: Arc<RwLock<HashMap<String, tokio::task::JoinHandle<()>>>>,
178}
179
180impl MessageBus {
181    /// Connect to Redis and create a new message bus
182    ///
183    /// Uses exponential backoff retry logic for resilient connection establishment.
184    ///
185    /// # Arguments
186    ///
187    /// * `redis_url` - Redis connection URL (e.g., "redis://localhost:6379")
188    /// * `node_id` - Unique identifier for this node
189    ///
190    /// # Example
191    ///
192    /// ```rust
193    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
194    /// let bus = MessageBus::connect("redis://localhost:6379", "camera-node").await?;
195    /// # Ok(())
196    /// # }
197    /// ```
198    pub async fn connect(redis_url: &str, node_id: &str) -> Result<Self> {
199        Self::connect_with_retry(redis_url, node_id, RetryConfig::default()).await
200    }
201
202    /// Connect to Redis with custom retry configuration
203    ///
204    /// # Arguments
205    ///
206    /// * `redis_url` - Redis connection URL
207    /// * `node_id` - Unique identifier for this node
208    /// * `retry_config` - Retry configuration for exponential backoff
209    ///
210    /// # Example
211    ///
212    /// ```rust
213    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
214    /// let retry_config = RetryConfig {
215    ///     max_attempts: 10,
216    ///     initial_delay: Duration::from_millis(200),
217    ///     max_delay: Duration::from_secs(30),
218    ///     multiplier: 2.0,
219    /// };
220    /// let bus = MessageBus::connect_with_retry(
221    ///     "redis://localhost:6379",
222    ///     "camera-node",
223    ///     retry_config
224    /// ).await?;
225    /// # Ok(())
226    /// # }
227    /// ```
228    pub async fn connect_with_retry(redis_url: &str, node_id: &str, retry_config: RetryConfig) -> Result<Self> {
229        let client = Client::open(redis_url).map_err(MessagingError::Redis)?;
230
231        let mut attempt = 0;
232        let mut delay = retry_config.initial_delay;
233
234        loop {
235            attempt += 1;
236
237            match client.get_multiplexed_async_connection().await {
238                Ok(connection) => {
239                    if attempt > 1 {
240                        tracing::info!(
241                            node_id = node_id,
242                            attempts = attempt,
243                            "Successfully connected to Redis after retries"
244                        );
245                    }
246
247                    return Ok(Self {
248                        connection: Arc::new(RwLock::new(connection)),
249                        node_id: node_id.to_string(),
250                        namespace: "mecha10".to_string(),
251                        redis_url: redis_url.to_string(),
252                        subscriptions: Arc::new(RwLock::new(HashMap::new())),
253                    });
254                }
255                Err(e) => {
256                    if attempt >= retry_config.max_attempts {
257                        tracing::error!(
258                            node_id = node_id,
259                            attempts = attempt,
260                            error = %e,
261                            "Failed to connect to Redis after maximum retry attempts"
262                        );
263                        return Err(MessagingError::Connection(format!(
264                            "Failed to connect to Redis after {} attempts: {}",
265                            attempt, e
266                        )));
267                    }
268
269                    tracing::warn!(
270                        node_id = node_id,
271                        attempt = attempt,
272                        max_attempts = retry_config.max_attempts,
273                        delay_ms = delay.as_millis(),
274                        error = %e,
275                        "Redis connection failed, retrying"
276                    );
277
278                    tokio::time::sleep(delay).await;
279
280                    // Calculate next delay with exponential backoff
281                    delay = std::cmp::min(
282                        Duration::from_secs_f64(delay.as_secs_f64() * retry_config.multiplier),
283                        retry_config.max_delay,
284                    );
285                }
286            }
287        }
288    }
289
290    /// Set the namespace for topic isolation
291    ///
292    /// Namespaces allow multiple robots/fleets to use the same Redis instance
293    /// without topic collisions.
294    pub fn set_namespace(&mut self, namespace: &str) {
295        self.namespace = namespace.to_string();
296    }
297
298    /// Publish a message to a topic
299    ///
300    /// # Arguments
301    ///
302    /// * `topic` - Topic name (e.g., "/scan", "/odom")
303    /// * `payload` - Message payload (must be serializable)
304    ///
305    /// # Example
306    ///
307    /// ```rust
308    /// # use serde::{Serialize, Deserialize};
309    /// # #[derive(Serialize, Deserialize)]
310    /// # struct Point { x: f32, y: f32 }
311    /// # async fn example(bus: &mut MessageBus) -> Result<(), Box<dyn std::error::Error>> {
312    /// bus.publish("/position", &Point { x: 1.0, y: 2.0 }).await?;
313    /// # Ok(())
314    /// # }
315    /// ```
316    pub async fn publish<T: Serialize>(&mut self, topic: &str, payload: &T) -> Result<()> {
317        let full_topic = format!("{}:{}", self.namespace, topic);
318
319        let envelope = serde_json::json!({
320            "publisher": self.node_id,
321            "timestamp": std::time::SystemTime::now()
322                .duration_since(std::time::UNIX_EPOCH)
323                .unwrap()
324                .as_millis() as u64,
325            "payload": payload,
326        });
327
328        let data = serde_json::to_string(&envelope)?;
329
330        let mut conn = self.connection.write().await;
331
332        // Use XADD with MAXLEN to keep approximately the last N messages in the stream
333        // The ~ makes it approximate/efficient (Redis can trim at any point)
334        // This prevents Redis from evicting empty streams and ensures stream persistence
335        //
336        // For camera/sensor streams that need low latency, calculate buffer based on target latency
337        // Target: 200ms buffer to balance latency vs frame loss
338        //
339        // Buffer calculation based on camera FPS (configurable via MECHA10_CAMERA_FPS env var):
340        //   15 FPS → 3 frames (200ms)
341        //   20 FPS → 4 frames (200ms)
342        //   30 FPS → 6 frames (200ms)
343        //   60 FPS → 12 frames (200ms)
344        let maxlen = if topic.contains("/camera/") || topic.contains("/video/") || topic.contains("/image/") {
345            // Read camera FPS from environment (default: 30)
346            let fps = std::env::var("MECHA10_CAMERA_FPS")
347                .ok()
348                .and_then(|s| s.parse::<f32>().ok())
349                .unwrap_or(30.0);
350
351            // Calculate buffer size for ~200ms latency
352            let target_latency_ms = 200.0;
353            let buffer_frames = ((fps * target_latency_ms) / 1000.0).ceil() as usize;
354            buffer_frames.max(2) // Minimum 2 frames to prevent underflow
355        } else {
356            100 // Default: keep last 100 messages for other topics
357        };
358
359        let _: () = conn
360            .xadd_maxlen(
361                &full_topic,
362                redis::streams::StreamMaxlen::Approx(maxlen),
363                "*",
364                &[("data", data)],
365            )
366            .await
367            .map_err(MessagingError::Redis)?;
368
369        Ok(())
370    }
371
372    /// Subscribe to a topic with a consumer group
373    ///
374    /// Consumer groups allow multiple nodes to process messages in parallel,
375    /// with each message delivered to only one consumer in the group.
376    ///
377    /// # Arguments
378    ///
379    /// * `topic` - Topic name (e.g., "/scan", "/odom")
380    /// * `consumer_group` - Consumer group name (e.g., "processing", "logging")
381    ///
382    /// # Example
383    ///
384    /// ```rust
385    /// # use serde::{Serialize, Deserialize};
386    /// # #[derive(Serialize, Deserialize)]
387    /// # struct LaserScan { ranges: Vec<f32> }
388    /// # async fn example(bus: &mut MessageBus) -> Result<(), Box<dyn std::error::Error>> {
389    /// let mut rx = bus.subscribe::<LaserScan>("/scan", "slam").await?;
390    /// while let Some(msg) = rx.recv().await {
391    ///     println!("Received: {:?}", msg.payload);
392    ///     msg.ack().await?;
393    /// }
394    /// # Ok(())
395    /// # }
396    /// ```
397    pub async fn subscribe<T: DeserializeOwned + Send + 'static>(
398        &mut self,
399        topic: &str,
400        consumer_group: &str,
401    ) -> Result<Subscriber<T>> {
402        let full_topic = format!("{}:{}", self.namespace, topic);
403        let consumer_id = format!("{}:{}", self.node_id, uuid::Uuid::new_v4());
404
405        // Create consumer group with retry logic (exponential backoff)
406        let max_retries = 5;
407        let mut retry_count = 0;
408        let mut group_created = false;
409
410        while retry_count < max_retries {
411            let mut conn = self.connection.write().await;
412            let result: std::result::Result<(), RedisError> =
413                conn.xgroup_create_mkstream(&full_topic, consumer_group, "$").await;
414            drop(conn);
415
416            match result {
417                Ok(_) => {
418                    tracing::debug!("Created consumer group '{}' for topic '{}'", consumer_group, full_topic);
419                    group_created = true;
420                    break;
421                }
422                Err(e) => {
423                    let err_msg = e.to_string();
424                    // Group already exists is OK
425                    if err_msg.contains("BUSYGROUP") {
426                        tracing::debug!(
427                            "Consumer group '{}' already exists for topic '{}'",
428                            consumer_group,
429                            full_topic
430                        );
431                        group_created = true;
432                        break;
433                    }
434
435                    retry_count += 1;
436                    if retry_count < max_retries {
437                        let delay = std::time::Duration::from_millis(100 * 2_u64.pow(retry_count - 1));
438                        tracing::warn!(
439                            "⚠️  Failed to create consumer group '{}' for topic '{}' (attempt {}/{}): {:?}. Retrying in {:?}...",
440                            consumer_group, full_topic, retry_count, max_retries, e, delay
441                        );
442                        tokio::time::sleep(delay).await;
443                    } else {
444                        tracing::error!(
445                            "❌ Failed to create consumer group '{}' for topic '{}' after {} attempts: {:?}",
446                            consumer_group,
447                            full_topic,
448                            max_retries,
449                            e
450                        );
451                        return Err(MessagingError::Subscription(format!(
452                            "Failed to create consumer group after {} retries: {}",
453                            max_retries, e
454                        )));
455                    }
456                }
457            }
458        }
459
460        if !group_created {
461            return Err(MessagingError::Subscription(format!(
462                "Failed to verify consumer group creation for topic '{}'",
463                full_topic
464            )));
465        }
466
467        // Create channel for messages
468        let (tx, rx) = mpsc::unbounded_channel();
469
470        // Spawn background task to read from stream
471        let redis_url = self.redis_url.clone();
472        let full_topic_clone = full_topic.clone();
473        let consumer_group_clone = consumer_group.to_string();
474        let consumer_id_clone = consumer_id.clone();
475
476        let task = tokio::spawn(async move {
477            tracing::debug!(
478                "Background subscription task started for topic '{}' group '{}'",
479                full_topic_clone,
480                consumer_group_clone
481            );
482
483            // Track consecutive connection failures for client recreation
484            let mut consecutive_failures: u32 = 0;
485            const MAX_FAILURES_BEFORE_RECREATE: u32 = 5;
486
487            // Current client - will be recreated if connection driver terminates
488            let mut current_client = match Client::open(redis_url.as_str()) {
489                Ok(c) => c,
490                Err(e) => {
491                    tracing::error!(
492                        "❌ Failed to create Redis client for topic '{}': {:?}",
493                        full_topic_clone,
494                        e
495                    );
496                    return;
497                }
498            };
499
500            // Create a dedicated shared connection for acknowledgments
501            // This avoids creating new connections for each message ack
502            let mut ack_connection: Arc<RwLock<MultiplexedConnection>> =
503                match current_client.get_multiplexed_async_connection().await {
504                    Ok(conn) => Arc::new(RwLock::new(conn)),
505                    Err(e) => {
506                        tracing::error!(
507                            "❌ Failed to create ack connection for topic '{}': {:?}",
508                            full_topic_clone,
509                            e
510                        );
511                        return;
512                    }
513                };
514
515            loop {
516                match current_client.get_multiplexed_async_connection().await {
517                    Ok(mut conn) => {
518                        // Reset failure counter on successful connection
519                        consecutive_failures = 0;
520                        // Read messages from stream
521                        let result: std::result::Result<redis::streams::StreamReadReply, RedisError> = conn
522                            .xread_options(
523                                &[&full_topic_clone],
524                                &[">"],
525                                &redis::streams::StreamReadOptions::default()
526                                    .group(&consumer_group_clone, &consumer_id_clone)
527                                    .count(10)
528                                    .block(1000),
529                            )
530                            .await;
531
532                        match result {
533                            Ok(reply) => {
534                                tracing::debug!(
535                                    "📨 Received {} keys from stream '{}'",
536                                    reply.keys.len(),
537                                    full_topic_clone
538                                );
539                                for stream_key in reply.keys {
540                                    for stream_id in stream_key.ids {
541                                        // Parse message
542                                        match stream_id.map.get("data") {
543                                            Some(redis::Value::BulkString(bytes)) => {
544                                                match std::str::from_utf8(bytes) {
545                                                    Ok(json_str) => {
546                                                        match serde_json::from_str::<serde_json::Value>(json_str) {
547                                                            Ok(envelope) => {
548                                                                match serde_json::from_value::<T>(
549                                                                    envelope["payload"].clone(),
550                                                                ) {
551                                                                    Ok(payload) => {
552                                                                        let msg = Message {
553                                                                            id: stream_id.id.clone(),
554                                                                            topic: full_topic_clone.clone(),
555                                                                            publisher: envelope["publisher"]
556                                                                                .as_str()
557                                                                                .unwrap_or("")
558                                                                                .to_string(),
559                                                                            timestamp: envelope["timestamp"]
560                                                                                .as_u64()
561                                                                                .unwrap_or(0),
562                                                                            payload,
563                                                                            ack_connection: Some(
564                                                                                ack_connection.clone(),
565                                                                            ),
566                                                                            consumer_group: Some(
567                                                                                consumer_group_clone.clone(),
568                                                                            ),
569                                                                        };
570
571                                                                        if tx.send(msg).is_err() {
572                                                                            tracing::warn!("Channel closed, subscriber dropped for topic '{}'", full_topic_clone);
573                                                                            return;
574                                                                            // Subscriber dropped
575                                                                        }
576                                                                    }
577                                                                    Err(e) => {
578                                                                        tracing::warn!(
579                                                                            "Failed to deserialize payload for topic '{}' msg '{}': {:?}",
580                                                                            full_topic_clone, stream_id.id, e
581                                                                        );
582                                                                    }
583                                                                }
584                                                            }
585                                                            Err(e) => {
586                                                                tracing::warn!(
587                                                                    "Failed to parse JSON envelope for topic '{}' msg '{}': {:?}",
588                                                                    full_topic_clone, stream_id.id, e
589                                                                );
590                                                            }
591                                                        }
592                                                    }
593                                                    Err(e) => {
594                                                        tracing::warn!(
595                                                            "Failed to decode UTF-8 for topic '{}' msg '{}': {:?}",
596                                                            full_topic_clone,
597                                                            stream_id.id,
598                                                            e
599                                                        );
600                                                    }
601                                                }
602                                            }
603                                            Some(other) => {
604                                                tracing::warn!(
605                                                    "Unexpected Redis value type for topic '{}' msg '{}': {:?}",
606                                                    full_topic_clone,
607                                                    stream_id.id,
608                                                    other
609                                                );
610                                            }
611                                            None => {
612                                                tracing::debug!(
613                                                    "Missing 'data' field for topic '{}' msg '{}'",
614                                                    full_topic_clone,
615                                                    stream_id.id
616                                                );
617                                            }
618                                        }
619                                    }
620                                }
621                            }
622                            Err(e) => {
623                                let err_msg = e.to_string();
624
625                                // If consumer group doesn't exist, try to recreate it (this is normal for new topics)
626                                if err_msg.contains("NOGROUP") {
627                                    tracing::debug!(
628                                        "Consumer group '{}' missing for topic '{}', recreating...",
629                                        consumer_group_clone,
630                                        full_topic_clone
631                                    );
632                                    let result: std::result::Result<(), RedisError> = conn
633                                        .xgroup_create_mkstream(&full_topic_clone, &consumer_group_clone, "$")
634                                        .await;
635                                    if let Err(create_err) = result {
636                                        tracing::debug!(
637                                            "Failed to recreate consumer group '{}': {:?}",
638                                            consumer_group_clone,
639                                            create_err
640                                        );
641                                    }
642                                } else {
643                                    tracing::debug!(
644                                        "XREAD error for topic '{}' group '{}': {:?}",
645                                        full_topic_clone,
646                                        consumer_group_clone,
647                                        e
648                                    );
649                                }
650
651                                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
652                            }
653                        }
654                    }
655                    Err(e) => {
656                        consecutive_failures += 1;
657                        tracing::debug!(
658                            "Failed to get Redis connection for topic '{}' (attempt {}): {:?}",
659                            full_topic_clone,
660                            consecutive_failures,
661                            e
662                        );
663
664                        // If we've failed too many times, recreate the client
665                        if consecutive_failures >= MAX_FAILURES_BEFORE_RECREATE {
666                            tracing::debug!(
667                                "Recreating Redis client for topic '{}' after {} consecutive failures",
668                                full_topic_clone,
669                                consecutive_failures
670                            );
671
672                            match Client::open(redis_url.as_str()) {
673                                Ok(new_client) => {
674                                    current_client = new_client;
675                                    consecutive_failures = 0;
676
677                                    // Also recreate the ack connection
678                                    match current_client.get_multiplexed_async_connection().await {
679                                        Ok(conn) => {
680                                            ack_connection = Arc::new(RwLock::new(conn));
681                                            tracing::debug!(
682                                                "Successfully recreated Redis client for topic '{}'",
683                                                full_topic_clone
684                                            );
685                                        }
686                                        Err(e) => {
687                                            tracing::debug!(
688                                                "Failed to create new ack connection for topic '{}': {:?}",
689                                                full_topic_clone,
690                                                e
691                                            );
692                                        }
693                                    }
694                                }
695                                Err(e) => {
696                                    tracing::debug!(
697                                        "Failed to recreate Redis client for topic '{}': {:?}",
698                                        full_topic_clone,
699                                        e
700                                    );
701                                }
702                            }
703                        }
704
705                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
706                    }
707                }
708            }
709        });
710
711        // Store task handle
712        let mut subs = self.subscriptions.write().await;
713        subs.insert(full_topic, task);
714
715        Ok(Subscriber {
716            rx,
717            topic: topic.to_string(),
718        })
719    }
720
721    /// Subscribe to multiple topics matching a wildcard pattern
722    ///
723    /// Enables aggregated subscriptions across multiple topics using wildcard patterns.
724    /// Useful for collecting data from multiple robots (e.g., `*/camera/rgb` subscribes
725    /// to camera feeds from all robots).
726    ///
727    /// # Arguments
728    ///
729    /// * `pattern` - Wildcard pattern with `*` as placeholder (e.g., "*/camera/rgb", "robot-*/scan")
730    /// * `consumer_group` - Consumer group name for load balancing
731    ///
732    /// # Pattern Syntax
733    ///
734    /// - `*` matches any segment in the topic path
735    /// - `*/camera/rgb` matches "robot-1/camera/rgb", "robot-2/camera/rgb", etc.
736    /// - `robot-*/scan` matches "robot-1/scan", "robot-alpha/scan", etc.
737    ///
738    /// # Example
739    ///
740    /// ```rust,no_run
741    /// # use serde::{Serialize, Deserialize};
742    /// # #[derive(Serialize, Deserialize)]
743    /// # struct Frame { data: Vec<u8> }
744    /// # async fn example(bus: &mut MessageBus) -> Result<(), Box<dyn std::error::Error>> {
745    /// // Subscribe to camera feeds from ALL robots
746    /// let mut rx = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-processing").await?;
747    ///
748    /// while let Some(msg) = rx.recv().await {
749    ///     println!("Received frame from robot: {}", msg.topic);
750    ///     // Process frame from any robot
751    ///     msg.ack().await?;
752    /// }
753    /// # Ok(())
754    /// # }
755    /// ```
756    ///
757    /// # Implementation
758    ///
759    /// This method:
760    /// 1. Converts wildcard pattern to Redis KEYS pattern
761    /// 2. Discovers all matching topics using `discover_topics()`
762    /// 3. Subscribes to each discovered topic with the same consumer group
763    /// 4. Multiplexes messages from all topics into a single receiver
764    ///
765    /// # Note
766    ///
767    /// This performs initial topic discovery using Redis KEYS command. For dynamic
768    /// topic discovery (new topics appearing after subscription), consider polling
769    /// `discover_topics()` periodically and creating new subscriptions.
770    pub async fn subscribe_pattern<T: DeserializeOwned + Send + 'static>(
771        &mut self,
772        pattern: &str,
773        consumer_group: &str,
774    ) -> Result<Subscriber<T>> {
775        // Convert wildcard pattern to Redis KEYS pattern
776        // Example: "*/camera/rgb" -> "mecha10:*/camera/rgb"
777        let full_pattern = format!("{}:{}", self.namespace, pattern);
778
779        // Discover all topics matching the pattern
780        let topics = self.discover_topics(&full_pattern).await?;
781
782        if topics.is_empty() {
783            tracing::warn!(
784                pattern = pattern,
785                "No topics found matching pattern, subscriber will not receive messages until topics are created"
786            );
787        }
788
789        // Create a single channel for all matching topics
790        let (tx, rx) = mpsc::unbounded_channel();
791
792        // Subscribe to each discovered topic
793        for topic in topics {
794            let full_topic = format!("{}:{}", self.namespace, topic);
795            let consumer_id = format!("{}:{}", self.node_id, uuid::Uuid::new_v4());
796
797            // Create consumer group if it doesn't exist
798            let mut conn = self.connection.write().await;
799            let _: std::result::Result<(), RedisError> =
800                conn.xgroup_create_mkstream(&full_topic, consumer_group, "$").await;
801            drop(conn);
802
803            // Spawn background task to read from this stream
804            let redis_url = self.redis_url.clone();
805            let full_topic_clone = full_topic.clone();
806            let consumer_group_clone = consumer_group.to_string();
807            let consumer_id_clone = consumer_id.clone();
808            let tx_clone = tx.clone();
809            let topic_clone = topic.clone();
810
811            let task = tokio::spawn(async move {
812                // Track consecutive connection failures for client recreation
813                let mut consecutive_failures: u32 = 0;
814                const MAX_FAILURES_BEFORE_RECREATE: u32 = 5;
815
816                // Current client - will be recreated if connection driver terminates
817                let mut current_client = match Client::open(redis_url.as_str()) {
818                    Ok(c) => c,
819                    Err(e) => {
820                        tracing::error!(
821                            "❌ Failed to create Redis client for topic '{}': {:?}",
822                            full_topic_clone,
823                            e
824                        );
825                        return;
826                    }
827                };
828
829                // Create a dedicated shared connection for acknowledgments
830                let mut ack_connection: Arc<RwLock<MultiplexedConnection>> =
831                    match current_client.get_multiplexed_async_connection().await {
832                        Ok(conn) => Arc::new(RwLock::new(conn)),
833                        Err(e) => {
834                            tracing::error!(
835                                "❌ Failed to create ack connection for topic '{}': {:?}",
836                                full_topic_clone,
837                                e
838                            );
839                            return;
840                        }
841                    };
842
843                loop {
844                    match current_client.get_multiplexed_async_connection().await {
845                        Ok(mut conn) => {
846                            // Reset failure counter on successful connection
847                            consecutive_failures = 0;
848                            // Read messages from stream
849                            let result: std::result::Result<redis::streams::StreamReadReply, RedisError> = conn
850                                .xread_options(
851                                    &[&full_topic_clone],
852                                    &[">"],
853                                    &redis::streams::StreamReadOptions::default()
854                                        .group(&consumer_group_clone, &consumer_id_clone)
855                                        .count(10)
856                                        .block(1000),
857                                )
858                                .await;
859
860                            match result {
861                                Ok(reply) => {
862                                    for stream_key in reply.keys {
863                                        for stream_id in stream_key.ids {
864                                            // Parse message
865                                            if let Some(redis::Value::BulkString(bytes)) = stream_id.map.get("data") {
866                                                if let Ok(json_str) = std::str::from_utf8(bytes) {
867                                                    if let Ok(envelope) =
868                                                        serde_json::from_str::<serde_json::Value>(json_str)
869                                                    {
870                                                        if let Ok(payload) =
871                                                            serde_json::from_value::<T>(envelope["payload"].clone())
872                                                        {
873                                                            let msg = Message {
874                                                                id: stream_id.id.clone(),
875                                                                topic: topic_clone.clone(),
876                                                                publisher: envelope["publisher"]
877                                                                    .as_str()
878                                                                    .unwrap_or("")
879                                                                    .to_string(),
880                                                                timestamp: envelope["timestamp"].as_u64().unwrap_or(0),
881                                                                payload,
882                                                                ack_connection: Some(ack_connection.clone()),
883                                                                consumer_group: Some(consumer_group_clone.clone()),
884                                                            };
885
886                                                            if tx_clone.send(msg).is_err() {
887                                                                return; // Subscriber dropped
888                                                            }
889                                                        }
890                                                    }
891                                                }
892                                            }
893                                        }
894                                    }
895                                }
896                                Err(_) => {
897                                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
898                                }
899                            }
900                        }
901                        Err(e) => {
902                            consecutive_failures += 1;
903                            tracing::warn!(
904                                "❌ Failed to get Redis connection for topic '{}' (attempt {}): {:?}",
905                                full_topic_clone,
906                                consecutive_failures,
907                                e
908                            );
909
910                            // If we've failed too many times, recreate the client
911                            if consecutive_failures >= MAX_FAILURES_BEFORE_RECREATE {
912                                tracing::warn!(
913                                    "🔄 Recreating Redis client for topic '{}' after {} consecutive failures",
914                                    full_topic_clone,
915                                    consecutive_failures
916                                );
917
918                                match Client::open(redis_url.as_str()) {
919                                    Ok(new_client) => {
920                                        current_client = new_client;
921                                        consecutive_failures = 0;
922
923                                        // Also recreate the ack connection
924                                        match current_client.get_multiplexed_async_connection().await {
925                                            Ok(conn) => {
926                                                ack_connection = Arc::new(RwLock::new(conn));
927                                                tracing::info!(
928                                                    "✅ Successfully recreated Redis client and ack connection for topic '{}'",
929                                                    full_topic_clone
930                                                );
931                                            }
932                                            Err(e) => {
933                                                tracing::error!(
934                                                    "❌ Failed to create new ack connection for topic '{}': {:?}",
935                                                    full_topic_clone,
936                                                    e
937                                                );
938                                            }
939                                        }
940                                    }
941                                    Err(e) => {
942                                        tracing::error!(
943                                            "❌ Failed to recreate Redis client for topic '{}': {:?}",
944                                            full_topic_clone,
945                                            e
946                                        );
947                                    }
948                                }
949                            }
950
951                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
952                        }
953                    }
954                }
955            });
956
957            // Store task handle
958            let mut subs = self.subscriptions.write().await;
959            subs.insert(format!("pattern:{}:{}", pattern, full_topic), task);
960        }
961
962        Ok(Subscriber {
963            rx,
964            topic: pattern.to_string(),
965        })
966    }
967
968    /// Discover topics matching a Redis KEYS pattern
969    ///
970    /// Uses Redis KEYS command to find all streams matching the given pattern.
971    /// This is useful for aggregated topic subscriptions.
972    ///
973    /// # Arguments
974    ///
975    /// * `pattern` - Redis KEYS pattern (e.g., "mecha10:topic:/sensor/camera/*")
976    ///
977    /// # Returns
978    ///
979    /// A vector of topic paths (without the namespace prefix)
980    ///
981    /// # Example
982    ///
983    /// ```rust,no_run
984    /// # async fn example(bus: &mut MessageBus) -> Result<(), Box<dyn std::error::Error>> {
985    /// // Discover all camera topics
986    /// let topics = bus.discover_topics("mecha10:topic:/sensor/camera/*").await?;
987    /// println!("Found {} camera topics", topics.len());
988    /// # Ok(())
989    /// # }
990    /// ```
991    ///
992    /// # Note
993    ///
994    /// This uses Redis KEYS command which can be slow on large datasets.
995    /// For production use with many topics, consider using SCAN instead.
996    pub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>> {
997        use redis::AsyncCommands;
998
999        let mut conn = self.connection.write().await;
1000
1001        // Use KEYS command to find matching topics
1002        let keys: Vec<String> = conn.keys(pattern).await.map_err(MessagingError::Redis)?;
1003
1004        // Strip namespace prefix from each key
1005        let prefix = format!("{}:", self.namespace);
1006        let topics: Vec<String> = keys
1007            .into_iter()
1008            .filter_map(|key| key.strip_prefix(&prefix).map(|s| s.to_string()))
1009            .collect();
1010
1011        Ok(topics)
1012    }
1013
1014    /// Unsubscribe from all topics and close connections
1015    pub async fn close(&mut self) -> Result<()> {
1016        let mut subs = self.subscriptions.write().await;
1017        for (_, task) in subs.drain() {
1018            task.abort();
1019        }
1020        Ok(())
1021    }
1022}