mecha10_core/
redis_bridge.rs

1//! Redis Bridge
2//!
3//! Forwards messages between local Redis (on-robot) and control plane Redis (cloud).
4//! This enables K8s-like distributed architecture where nodes can be scheduled
5//! on robots or in the cloud.
6//!
7//! # Architecture
8//!
9//! ```text
10//! Robot:
11//!   Local Redis (localhost:6379)
12//!     ↓↑ (bridged topics)
13//!   RedisBridge
14//!     ↓↑ (network)
15//! Cloud:
16//!   Control Plane Redis (shared across fleet)
17//!     ↓↑
18//!   Remote Nodes (K8s deployments)
19//! ```
20//!
21//! # Topic Routing
22//!
23//! - **Local topics**: Stay on local Redis (e.g., `/sensor/camera/rgb` - high frequency)
24//! - **Bridged topics**: Forwarded to control plane (e.g., `/system/heartbeat`, `/telemetry/*`)
25//! - **Wildcard subscriptions**: Remote nodes can subscribe to `*/camera/rgb` (all robots)
26//!
27//! # Configuration
28//!
29//! ```yaml
30//! bridge:
31//!   enabled: true
32//!   local_topics:
33//!     - "/sensor/camera/*"     # Keep local (high frequency)
34//!     - "/sensor/lidar/*"      # Keep local
35//!   bridged_topics:
36//!     - "/system/*"            # Bridge to cloud
37//!     - "/telemetry/*"         # Bridge to cloud
38//!     - "/logs/*"              # Bridge to cloud
39//! ```
40
41use crate::error::{Mecha10Error, Result};
42use glob::Pattern;
43use mecha10_messaging::MessageBus;
44use serde::{Deserialize, Serialize};
45use std::sync::Arc;
46use tokio::sync::RwLock;
47use tokio::task::JoinHandle;
48use tracing::{debug, error, info, warn};
49
50/// Bridge configuration
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct BridgeConfig {
53    /// Enable or disable the bridge
54    pub enabled: bool,
55
56    /// Topic patterns that should stay local (not bridged)
57    /// Examples: "/sensor/camera/*", "/sensor/lidar/*"
58    pub local_topics: Vec<String>,
59
60    /// Topic patterns that should be bridged to control plane
61    /// Examples: "/system/*", "/telemetry/*", "/logs/*"
62    pub bridged_topics: Vec<String>,
63
64    /// Robot identifier for wildcard subscriptions
65    /// Used in control plane for topic prefixing (e.g., "robot-123/camera/rgb")
66    #[serde(default)]
67    pub robot_id: Option<String>,
68}
69
70impl Default for BridgeConfig {
71    fn default() -> Self {
72        Self {
73            enabled: true,
74            local_topics: vec![
75                // High-frequency sensor data stays local
76                "/sensor/camera/*".to_string(),
77                "/sensor/lidar/*".to_string(),
78                "/sensor/imu/*".to_string(),
79            ],
80            bridged_topics: vec![
81                // System topics go to control plane
82                "/system/*".to_string(),
83                "/telemetry/*".to_string(),
84                "/logs/*".to_string(),
85                "/fleet/*".to_string(),
86            ],
87            robot_id: None,
88        }
89    }
90}
91
92impl BridgeConfig {
93    /// Check if a topic should be bridged to control plane
94    pub fn should_bridge(&self, topic: &str) -> bool {
95        if !self.enabled {
96            return false;
97        }
98
99        // Check if topic matches any bridged patterns
100        let matches_bridged = self.bridged_topics.iter().any(|pattern| {
101            if let Ok(glob) = Pattern::new(pattern) {
102                glob.matches(topic)
103            } else {
104                warn!("Invalid glob pattern in bridged_topics: {}", pattern);
105                false
106            }
107        });
108
109        // Check if topic matches any local-only patterns (override)
110        let matches_local = self.local_topics.iter().any(|pattern| {
111            if let Ok(glob) = Pattern::new(pattern) {
112                glob.matches(topic)
113            } else {
114                warn!("Invalid glob pattern in local_topics: {}", pattern);
115                false
116            }
117        });
118
119        // Bridge if matches bridged pattern and doesn't match local pattern
120        matches_bridged && !matches_local
121    }
122
123    /// Get the prefixed topic for control plane
124    /// Example: "/camera/rgb" → "robot-123/camera/rgb"
125    pub fn get_control_plane_topic(&self, topic: &str) -> String {
126        if let Some(robot_id) = &self.robot_id {
127            format!("{}{}", robot_id, topic)
128        } else {
129            topic.to_string()
130        }
131    }
132}
133
134/// Message forwarder task
135struct ForwarderTask {
136    handle: JoinHandle<()>,
137    topic: String,
138}
139
140impl ForwarderTask {
141    fn new(handle: JoinHandle<()>, topic: String) -> Self {
142        Self { handle, topic }
143    }
144
145    fn abort(&self) {
146        self.handle.abort();
147    }
148}
149
150/// Redis bridge for forwarding messages between local and control plane Redis
151pub struct RedisBridge {
152    /// Bridge configuration
153    config: BridgeConfig,
154
155    /// Local message bus (on-robot)
156    local_bus: Arc<RwLock<MessageBus>>,
157
158    /// Control plane message bus (cloud)
159    control_plane_bus: Arc<RwLock<MessageBus>>,
160
161    /// Running forwarder tasks
162    tasks: Arc<RwLock<Vec<ForwarderTask>>>,
163
164    /// Shutdown signal
165    shutdown: Arc<RwLock<bool>>,
166}
167
168impl RedisBridge {
169    /// Create a new Redis bridge
170    ///
171    /// # Arguments
172    ///
173    /// * `config` - Bridge configuration
174    /// * `local_bus` - Local Redis message bus
175    /// * `control_plane_bus` - Control plane Redis message bus
176    ///
177    /// # Example
178    ///
179    /// ```rust,no_run
180    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
181    /// let config = BridgeConfig::default();
182    /// let local_bus = MessageBus::connect("redis://localhost:6379", "robot-1").await?;
183    /// let cp_bus = MessageBus::connect("redis://control-plane:6379", "robot-1").await?;
184    ///
185    /// let bridge = RedisBridge::new(config, local_bus, cp_bus);
186    /// bridge.start().await?;
187    /// # Ok(())
188    /// # }
189    /// ```
190    pub fn new(config: BridgeConfig, local_bus: MessageBus, control_plane_bus: MessageBus) -> Self {
191        Self {
192            config,
193            local_bus: Arc::new(RwLock::new(local_bus)),
194            control_plane_bus: Arc::new(RwLock::new(control_plane_bus)),
195            tasks: Arc::new(RwLock::new(Vec::new())),
196            shutdown: Arc::new(RwLock::new(false)),
197        }
198    }
199
200    /// Start the bridge
201    ///
202    /// Spawns forwarder tasks for each bridged topic pattern.
203    /// Messages matching bridged patterns will be forwarded from local to control plane.
204    pub async fn start(&self) -> Result<()> {
205        if !self.config.enabled {
206            info!("Redis bridge is disabled in configuration");
207            return Ok(());
208        }
209
210        info!(
211            "Starting Redis bridge with {} bridged topic patterns",
212            self.config.bridged_topics.len()
213        );
214
215        for pattern in &self.config.bridged_topics {
216            self.start_forwarder(pattern).await?;
217        }
218
219        info!("Redis bridge started successfully");
220        Ok(())
221    }
222
223    /// Start a forwarder task for a specific topic pattern
224    async fn start_forwarder(&self, pattern: &str) -> Result<()> {
225        debug!("Starting forwarder for pattern: {}", pattern);
226
227        // For now, we'll implement simple wildcard forwarding
228        // In production, this would discover actual topics matching the pattern
229        let topic = pattern.to_string();
230        let config = self.config.clone();
231        let local_bus = Arc::clone(&self.local_bus);
232        let control_plane_bus = Arc::clone(&self.control_plane_bus);
233        let shutdown = Arc::clone(&self.shutdown);
234
235        let handle = tokio::spawn(async move {
236            if let Err(e) = Self::forward_messages(topic.clone(), config, local_bus, control_plane_bus, shutdown).await
237            {
238                error!("Forwarder task error for topic {}: {}", topic, e);
239            }
240        });
241
242        let mut tasks = self.tasks.write().await;
243        tasks.push(ForwarderTask::new(handle, pattern.to_string()));
244
245        Ok(())
246    }
247
248    /// Forward messages from local to control plane
249    #[allow(clippy::too_many_arguments)]
250    async fn forward_messages(
251        topic: String,
252        config: BridgeConfig,
253        local_bus: Arc<RwLock<MessageBus>>,
254        control_plane_bus: Arc<RwLock<MessageBus>>,
255        shutdown: Arc<RwLock<bool>>,
256    ) -> Result<()> {
257        // Subscribe to the topic on local Redis
258        let mut local = local_bus.write().await;
259        let mut subscriber = local
260            .subscribe::<serde_json::Value>(&topic, "bridge")
261            .await
262            .map_err(|e| Mecha10Error::MessagingError {
263                message: format!("Failed to subscribe to topic {}: {}", topic, e),
264                suggestion: "Check local Redis connection".to_string(),
265            })?;
266        drop(local);
267
268        debug!("Subscribed to local topic: {}", topic);
269
270        // Forward messages until shutdown
271        while !*shutdown.read().await {
272            if let Some(msg) = subscriber.recv().await {
273                debug!("Received message on topic {} from {}", msg.topic, msg.publisher);
274
275                // Get the control plane topic (with robot prefix if configured)
276                let cp_topic = config.get_control_plane_topic(&topic);
277
278                // Forward to control plane
279                let mut cp_bus = control_plane_bus.write().await;
280                if let Err(e) = cp_bus.publish(&cp_topic, &msg.payload).await {
281                    warn!(
282                        "Failed to forward message from {} to control plane topic {}: {}",
283                        topic, cp_topic, e
284                    );
285                } else {
286                    debug!("Forwarded message to control plane topic: {}", cp_topic);
287                }
288                drop(cp_bus);
289
290                // Acknowledge the message on local Redis
291                if let Err(e) = msg.ack().await {
292                    warn!("Failed to acknowledge message on topic {}: {}", topic, e);
293                }
294            }
295        }
296
297        debug!("Forwarder task stopped for topic: {}", topic);
298        Ok(())
299    }
300
301    /// Stop the bridge gracefully
302    ///
303    /// Stops all forwarder tasks and waits for them to complete.
304    pub async fn stop(&self) -> Result<()> {
305        info!("Stopping Redis bridge...");
306
307        // Set shutdown signal
308        *self.shutdown.write().await = true;
309
310        // Abort all tasks
311        let tasks = self.tasks.read().await;
312        for task in tasks.iter() {
313            debug!("Stopping forwarder task for topic: {}", task.topic);
314            task.abort();
315        }
316        drop(tasks);
317
318        // Clear tasks
319        self.tasks.write().await.clear();
320
321        info!("Redis bridge stopped");
322        Ok(())
323    }
324
325    /// Check if the bridge is running
326    pub async fn is_running(&self) -> bool {
327        !*self.shutdown.read().await
328    }
329
330    /// Get the number of active forwarder tasks
331    pub async fn active_tasks(&self) -> usize {
332        self.tasks.read().await.len()
333    }
334}