codetether_agent/mcp/
bus_bridge.rs1use crate::bus::BusEnvelope;
12use chrono::{DateTime, Utc};
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18
19const DEFAULT_BUFFER_SIZE: usize = 1_000;
21
22const RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(3);
24
25#[derive(Debug)]
32pub struct BusBridge {
33 buffer: Arc<RwLock<VecDeque<BusEnvelope>>>,
35 connected: Arc<AtomicBool>,
37 total_received: Arc<AtomicU64>,
39 bus_url: String,
41 capacity: usize,
43}
44
45impl BusBridge {
46 pub fn new(bus_url: String) -> Self {
48 Self {
49 buffer: Arc::new(RwLock::new(VecDeque::with_capacity(DEFAULT_BUFFER_SIZE))),
50 connected: Arc::new(AtomicBool::new(false)),
51 total_received: Arc::new(AtomicU64::new(0)),
52 bus_url,
53 capacity: DEFAULT_BUFFER_SIZE,
54 }
55 }
56
57 pub fn spawn(self) -> Arc<Self> {
61 let bridge = Arc::new(self);
62 let bg = Arc::clone(&bridge);
63 tokio::spawn(async move {
64 bg.reader_loop().await;
65 });
66 bridge
67 }
68
69 pub async fn recent_events(
71 &self,
72 topic_filter: Option<&str>,
73 limit: usize,
74 since: Option<DateTime<Utc>>,
75 ) -> Vec<BusEnvelope> {
76 let buf = self.buffer.read().await;
77 buf.iter()
78 .rev()
79 .filter(|env| {
80 if let Some(filter) = topic_filter {
81 topic_matches(&env.topic, filter)
82 } else {
83 true
84 }
85 })
86 .filter(|env| {
87 if let Some(ts) = since {
88 env.timestamp >= ts
89 } else {
90 true
91 }
92 })
93 .take(limit)
94 .cloned()
95 .collect::<Vec<_>>()
96 .into_iter()
97 .rev() .collect()
99 }
100
101 pub fn status(&self) -> BusBridgeStatus {
103 BusBridgeStatus {
104 connected: self.connected.load(Ordering::Relaxed),
105 total_received: self.total_received.load(Ordering::Relaxed),
106 bus_url: self.bus_url.clone(),
107 buffer_capacity: self.capacity,
108 }
109 }
110
111 pub async fn buffer_len(&self) -> usize {
113 self.buffer.read().await.len()
114 }
115
116 async fn reader_loop(&self) {
121 loop {
122 info!(url = %self.bus_url, "BusBridge: connecting to bus SSE stream");
123 match self.read_sse_stream().await {
124 Ok(()) => {
125 info!("BusBridge: SSE stream closed normally");
126 }
127 Err(e) => {
128 warn!(error = %e, "BusBridge: SSE stream error, reconnecting");
129 }
130 }
131 self.connected.store(false, Ordering::Relaxed);
132 tokio::time::sleep(RECONNECT_DELAY).await;
133 }
134 }
135
136 async fn read_sse_stream(&self) -> anyhow::Result<()> {
138 let client = reqwest::Client::new();
139 let resp = client
140 .get(&self.bus_url)
141 .header("Accept", "text/event-stream")
142 .send()
143 .await?;
144
145 if !resp.status().is_success() {
146 anyhow::bail!("SSE endpoint returned {}", resp.status());
147 }
148
149 self.connected.store(true, Ordering::Relaxed);
150 info!("BusBridge: connected to SSE stream");
151
152 let mut event_type = String::new();
157 let mut data_buf = String::new();
158
159 use futures::StreamExt;
160 let mut byte_stream = resp.bytes_stream();
161
162 let mut line_buf = String::new();
164
165 while let Some(chunk) = byte_stream.next().await {
166 let chunk = chunk?;
167 let text = String::from_utf8_lossy(&chunk);
168
169 for ch in text.chars() {
170 if ch == '\n' {
171 let line = std::mem::take(&mut line_buf);
172 self.process_sse_line(&line, &mut event_type, &mut data_buf)
173 .await;
174 } else {
175 line_buf.push(ch);
176 }
177 }
178 }
179
180 Ok(())
181 }
182
183 async fn process_sse_line(&self, line: &str, event_type: &mut String, data_buf: &mut String) {
185 if line.is_empty() {
186 if event_type == "bus" && !data_buf.is_empty() {
188 match serde_json::from_str::<BusEnvelope>(data_buf) {
189 Ok(envelope) => {
190 self.push_envelope(envelope).await;
191 }
192 Err(e) => {
193 debug!(error = %e, "BusBridge: failed to parse bus envelope");
194 }
195 }
196 }
197 event_type.clear();
198 data_buf.clear();
199 } else if let Some(rest) = line.strip_prefix("event:") {
200 *event_type = rest.trim().to_string();
201 } else if let Some(rest) = line.strip_prefix("data:") {
202 if !data_buf.is_empty() {
203 data_buf.push('\n');
204 }
205 data_buf.push_str(rest.trim());
206 }
207 }
209
210 async fn push_envelope(&self, envelope: BusEnvelope) {
212 let mut buf = self.buffer.write().await;
213 if buf.len() >= self.capacity {
214 buf.pop_front();
215 }
216 buf.push_back(envelope);
217 drop(buf);
218 self.total_received.fetch_add(1, Ordering::Relaxed);
219 }
220}
221
222#[derive(Debug, Clone, serde::Serialize)]
224pub struct BusBridgeStatus {
225 pub connected: bool,
226 pub total_received: u64,
227 pub bus_url: String,
228 pub buffer_capacity: usize,
229}
230
231fn topic_matches(topic: &str, pattern: &str) -> bool {
235 if pattern == "*" {
236 return true;
237 }
238 if pattern.ends_with(".*") {
239 let prefix = &pattern[..pattern.len() - 2];
240 return topic.starts_with(prefix);
241 }
242 if pattern.starts_with(".*") {
243 let suffix = &pattern[2..];
244 return topic.ends_with(suffix);
245 }
246 topic == pattern
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 #[test]
254 fn test_topic_matches() {
255 assert!(topic_matches("agent.123.events", "*"));
256 assert!(topic_matches("agent.123.events", "agent.*"));
257 assert!(topic_matches("ralph.prd1", "ralph.*"));
258 assert!(!topic_matches("task.42", "agent.*"));
259 assert!(topic_matches("agent.123.events", "agent.123.events"));
260 }
261}