mecha10_core/
redis_bridge.rs1use crate::error::{Mecha10Error, Result};
42use glob::Pattern;
43use mecha10_messaging::MessageBus;
44use serde::{Deserialize, Serialize};
45use std::sync::Arc;
46use tokio::sync::RwLock;
47use tokio::task::JoinHandle;
48use tracing::{debug, error, info, warn};
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct BridgeConfig {
53 pub enabled: bool,
55
56 pub local_topics: Vec<String>,
59
60 pub bridged_topics: Vec<String>,
63
64 #[serde(default)]
67 pub robot_id: Option<String>,
68}
69
70impl Default for BridgeConfig {
71 fn default() -> Self {
72 Self {
73 enabled: true,
74 local_topics: vec![
75 "/sensor/camera/*".to_string(),
77 "/sensor/lidar/*".to_string(),
78 "/sensor/imu/*".to_string(),
79 ],
80 bridged_topics: vec![
81 "/system/*".to_string(),
83 "/telemetry/*".to_string(),
84 "/logs/*".to_string(),
85 "/fleet/*".to_string(),
86 ],
87 robot_id: None,
88 }
89 }
90}
91
92impl BridgeConfig {
93 pub fn should_bridge(&self, topic: &str) -> bool {
95 if !self.enabled {
96 return false;
97 }
98
99 let matches_bridged = self.bridged_topics.iter().any(|pattern| {
101 if let Ok(glob) = Pattern::new(pattern) {
102 glob.matches(topic)
103 } else {
104 warn!("Invalid glob pattern in bridged_topics: {}", pattern);
105 false
106 }
107 });
108
109 let matches_local = self.local_topics.iter().any(|pattern| {
111 if let Ok(glob) = Pattern::new(pattern) {
112 glob.matches(topic)
113 } else {
114 warn!("Invalid glob pattern in local_topics: {}", pattern);
115 false
116 }
117 });
118
119 matches_bridged && !matches_local
121 }
122
123 pub fn get_control_plane_topic(&self, topic: &str) -> String {
126 if let Some(robot_id) = &self.robot_id {
127 format!("{}{}", robot_id, topic)
128 } else {
129 topic.to_string()
130 }
131 }
132}
133
134struct ForwarderTask {
136 handle: JoinHandle<()>,
137 topic: String,
138}
139
140impl ForwarderTask {
141 fn new(handle: JoinHandle<()>, topic: String) -> Self {
142 Self { handle, topic }
143 }
144
145 fn abort(&self) {
146 self.handle.abort();
147 }
148}
149
150pub struct RedisBridge {
152 config: BridgeConfig,
154
155 local_bus: Arc<RwLock<MessageBus>>,
157
158 control_plane_bus: Arc<RwLock<MessageBus>>,
160
161 tasks: Arc<RwLock<Vec<ForwarderTask>>>,
163
164 shutdown: Arc<RwLock<bool>>,
166}
167
168impl RedisBridge {
169 pub fn new(config: BridgeConfig, local_bus: MessageBus, control_plane_bus: MessageBus) -> Self {
191 Self {
192 config,
193 local_bus: Arc::new(RwLock::new(local_bus)),
194 control_plane_bus: Arc::new(RwLock::new(control_plane_bus)),
195 tasks: Arc::new(RwLock::new(Vec::new())),
196 shutdown: Arc::new(RwLock::new(false)),
197 }
198 }
199
200 pub async fn start(&self) -> Result<()> {
205 if !self.config.enabled {
206 info!("Redis bridge is disabled in configuration");
207 return Ok(());
208 }
209
210 info!(
211 "Starting Redis bridge with {} bridged topic patterns",
212 self.config.bridged_topics.len()
213 );
214
215 for pattern in &self.config.bridged_topics {
216 self.start_forwarder(pattern).await?;
217 }
218
219 info!("Redis bridge started successfully");
220 Ok(())
221 }
222
223 async fn start_forwarder(&self, pattern: &str) -> Result<()> {
225 debug!("Starting forwarder for pattern: {}", pattern);
226
227 let topic = pattern.to_string();
230 let config = self.config.clone();
231 let local_bus = Arc::clone(&self.local_bus);
232 let control_plane_bus = Arc::clone(&self.control_plane_bus);
233 let shutdown = Arc::clone(&self.shutdown);
234
235 let handle = tokio::spawn(async move {
236 if let Err(e) = Self::forward_messages(topic.clone(), config, local_bus, control_plane_bus, shutdown).await
237 {
238 error!("Forwarder task error for topic {}: {}", topic, e);
239 }
240 });
241
242 let mut tasks = self.tasks.write().await;
243 tasks.push(ForwarderTask::new(handle, pattern.to_string()));
244
245 Ok(())
246 }
247
248 #[allow(clippy::too_many_arguments)]
250 async fn forward_messages(
251 topic: String,
252 config: BridgeConfig,
253 local_bus: Arc<RwLock<MessageBus>>,
254 control_plane_bus: Arc<RwLock<MessageBus>>,
255 shutdown: Arc<RwLock<bool>>,
256 ) -> Result<()> {
257 let mut local = local_bus.write().await;
259 let mut subscriber = local
260 .subscribe::<serde_json::Value>(&topic, "bridge")
261 .await
262 .map_err(|e| Mecha10Error::MessagingError {
263 message: format!("Failed to subscribe to topic {}: {}", topic, e),
264 suggestion: "Check local Redis connection".to_string(),
265 })?;
266 drop(local);
267
268 debug!("Subscribed to local topic: {}", topic);
269
270 while !*shutdown.read().await {
272 if let Some(msg) = subscriber.recv().await {
273 debug!("Received message on topic {} from {}", msg.topic, msg.publisher);
274
275 let cp_topic = config.get_control_plane_topic(&topic);
277
278 let mut cp_bus = control_plane_bus.write().await;
280 if let Err(e) = cp_bus.publish(&cp_topic, &msg.payload).await {
281 warn!(
282 "Failed to forward message from {} to control plane topic {}: {}",
283 topic, cp_topic, e
284 );
285 } else {
286 debug!("Forwarded message to control plane topic: {}", cp_topic);
287 }
288 drop(cp_bus);
289
290 if let Err(e) = msg.ack().await {
292 warn!("Failed to acknowledge message on topic {}: {}", topic, e);
293 }
294 }
295 }
296
297 debug!("Forwarder task stopped for topic: {}", topic);
298 Ok(())
299 }
300
301 pub async fn stop(&self) -> Result<()> {
305 info!("Stopping Redis bridge...");
306
307 *self.shutdown.write().await = true;
309
310 let tasks = self.tasks.read().await;
312 for task in tasks.iter() {
313 debug!("Stopping forwarder task for topic: {}", task.topic);
314 task.abort();
315 }
316 drop(tasks);
317
318 self.tasks.write().await.clear();
320
321 info!("Redis bridge stopped");
322 Ok(())
323 }
324
325 pub async fn is_running(&self) -> bool {
327 !*self.shutdown.read().await
328 }
329
330 pub async fn active_tasks(&self) -> usize {
332 self.tasks.read().await.len()
333 }
334}