Skip to main content

codetether_agent/mcp/
bus_bridge.rs

1//! MCP-to-Bus bridge
2//!
3//! Connects to the HTTP server's `/v1/bus/stream` SSE endpoint and buffers
4//! recent [`BusEnvelope`]s in a ring buffer.  The MCP server exposes this
5//! data through tools (`bus_events`, `bus_status`) and resources
6//! (`codetether://bus/events/recent`).
7//!
8//! The bridge runs as a background tokio task and reconnects automatically
9//! on transient failures.
10
11use crate::bus::BusEnvelope;
12use chrono::{DateTime, Utc};
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18
19/// Maximum number of envelopes kept in the ring buffer.
20const DEFAULT_BUFFER_SIZE: usize = 1_000;
21
22/// Reconnect delay after a transient SSE failure.
23const RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(3);
24
25// ─── BusBridge ───────────────────────────────────────────────────────────
26
27/// A read-only bridge from the HTTP bus SSE stream into the MCP process.
28///
29/// Call [`BusBridge::spawn`] to start the background reader, then query via
30/// [`BusBridge::recent_events`] or [`BusBridge::status`].
31#[derive(Debug)]
32pub struct BusBridge {
33    /// Ring buffer of recent envelopes (newest last).
34    buffer: Arc<RwLock<VecDeque<BusEnvelope>>>,
35    /// Whether the SSE reader is currently connected.
36    connected: Arc<AtomicBool>,
37    /// Total number of envelopes received since start.
38    total_received: Arc<AtomicU64>,
39    /// URL we connect to.
40    bus_url: String,
41    /// Max buffer capacity.
42    capacity: usize,
43}
44
45impl BusBridge {
46    /// Create a new bridge (does **not** start the background task).
47    pub fn new(bus_url: String) -> Self {
48        Self {
49            buffer: Arc::new(RwLock::new(VecDeque::with_capacity(DEFAULT_BUFFER_SIZE))),
50            connected: Arc::new(AtomicBool::new(false)),
51            total_received: Arc::new(AtomicU64::new(0)),
52            bus_url,
53            capacity: DEFAULT_BUFFER_SIZE,
54        }
55    }
56
57    /// Spawn the SSE reader as a background tokio task.
58    ///
59    /// Returns `Self` wrapped in an `Arc` for sharing with tool handlers.
60    pub fn spawn(self) -> Arc<Self> {
61        let bridge = Arc::new(self);
62        let bg = Arc::clone(&bridge);
63        tokio::spawn(async move {
64            bg.reader_loop().await;
65        });
66        bridge
67    }
68
69    /// Query recent events with optional topic filter and limit.
70    pub async fn recent_events(
71        &self,
72        topic_filter: Option<&str>,
73        limit: usize,
74        since: Option<DateTime<Utc>>,
75    ) -> Vec<BusEnvelope> {
76        let buf = self.buffer.read().await;
77        buf.iter()
78            .rev()
79            .filter(|env| {
80                if let Some(filter) = topic_filter {
81                    topic_matches(&env.topic, filter)
82                } else {
83                    true
84                }
85            })
86            .filter(|env| {
87                if let Some(ts) = since {
88                    env.timestamp >= ts
89                } else {
90                    true
91                }
92            })
93            .take(limit)
94            .cloned()
95            .collect::<Vec<_>>()
96            .into_iter()
97            .rev() // restore chronological order
98            .collect()
99    }
100
101    /// Current bridge status summary (JSON-friendly).
102    pub fn status(&self) -> BusBridgeStatus {
103        BusBridgeStatus {
104            connected: self.connected.load(Ordering::Relaxed),
105            total_received: self.total_received.load(Ordering::Relaxed),
106            bus_url: self.bus_url.clone(),
107            buffer_capacity: self.capacity,
108        }
109    }
110
111    /// Buffer size (number of envelopes currently held).
112    pub async fn buffer_len(&self) -> usize {
113        self.buffer.read().await.len()
114    }
115
116    // ── internal ──────────────────────────────────────────────────────
117
118    /// Background loop: connect to SSE, read envelopes, push to buffer.
119    /// Reconnects on failure.
120    async fn reader_loop(&self) {
121        loop {
122            info!(url = %self.bus_url, "BusBridge: connecting to bus SSE stream");
123            match self.read_sse_stream().await {
124                Ok(()) => {
125                    info!("BusBridge: SSE stream closed normally");
126                }
127                Err(e) => {
128                    warn!(error = %e, "BusBridge: SSE stream error, reconnecting");
129                }
130            }
131            self.connected.store(false, Ordering::Relaxed);
132            tokio::time::sleep(RECONNECT_DELAY).await;
133        }
134    }
135
136    /// Single SSE connection attempt.  Reads until the stream closes or errors.
137    async fn read_sse_stream(&self) -> anyhow::Result<()> {
138        let client = reqwest::Client::new();
139        let resp = client
140            .get(&self.bus_url)
141            .header("Accept", "text/event-stream")
142            .send()
143            .await?;
144
145        if !resp.status().is_success() {
146            anyhow::bail!("SSE endpoint returned {}", resp.status());
147        }
148
149        self.connected.store(true, Ordering::Relaxed);
150        info!("BusBridge: connected to SSE stream");
151
152        // Read line-by-line.  SSE format:
153        //   event: <type>\n
154        //   data: <json>\n
155        //   \n
156        let mut event_type = String::new();
157        let mut data_buf = String::new();
158
159        use futures::StreamExt;
160        let mut byte_stream = resp.bytes_stream();
161
162        // Accumulate raw bytes into lines
163        let mut line_buf = String::new();
164
165        while let Some(chunk) = byte_stream.next().await {
166            let chunk = chunk?;
167            let text = String::from_utf8_lossy(&chunk);
168
169            for ch in text.chars() {
170                if ch == '\n' {
171                    let line = std::mem::take(&mut line_buf);
172                    self.process_sse_line(&line, &mut event_type, &mut data_buf)
173                        .await;
174                } else {
175                    line_buf.push(ch);
176                }
177            }
178        }
179
180        Ok(())
181    }
182
183    /// Process a single SSE text line.
184    async fn process_sse_line(&self, line: &str, event_type: &mut String, data_buf: &mut String) {
185        if line.is_empty() {
186            // Empty line = end of event
187            if event_type == "bus" && !data_buf.is_empty() {
188                match serde_json::from_str::<BusEnvelope>(data_buf) {
189                    Ok(envelope) => {
190                        self.push_envelope(envelope).await;
191                    }
192                    Err(e) => {
193                        debug!(error = %e, "BusBridge: failed to parse bus envelope");
194                    }
195                }
196            }
197            event_type.clear();
198            data_buf.clear();
199        } else if let Some(rest) = line.strip_prefix("event:") {
200            *event_type = rest.trim().to_string();
201        } else if let Some(rest) = line.strip_prefix("data:") {
202            if !data_buf.is_empty() {
203                data_buf.push('\n');
204            }
205            data_buf.push_str(rest.trim());
206        }
207        // Ignore comment lines (`:`) and other prefixes
208    }
209
210    /// Push an envelope into the ring buffer, evicting oldest if full.
211    async fn push_envelope(&self, envelope: BusEnvelope) {
212        let mut buf = self.buffer.write().await;
213        if buf.len() >= self.capacity {
214            buf.pop_front();
215        }
216        buf.push_back(envelope);
217        drop(buf);
218        self.total_received.fetch_add(1, Ordering::Relaxed);
219    }
220}
221
222/// Status snapshot returned by [`BusBridge::status`].
223#[derive(Debug, Clone, serde::Serialize)]
224pub struct BusBridgeStatus {
225    pub connected: bool,
226    pub total_received: u64,
227    pub bus_url: String,
228    pub buffer_capacity: usize,
229}
230
231// ─── Helpers ─────────────────────────────────────────────────────────────
232
233/// Topic pattern matching (mirrors `server/mod.rs::topic_matches`).
234fn topic_matches(topic: &str, pattern: &str) -> bool {
235    if pattern == "*" {
236        return true;
237    }
238    if pattern.ends_with(".*") {
239        let prefix = &pattern[..pattern.len() - 2];
240        return topic.starts_with(prefix);
241    }
242    if pattern.starts_with(".*") {
243        let suffix = &pattern[2..];
244        return topic.ends_with(suffix);
245    }
246    topic == pattern
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    #[test]
254    fn test_topic_matches() {
255        assert!(topic_matches("agent.123.events", "*"));
256        assert!(topic_matches("agent.123.events", "agent.*"));
257        assert!(topic_matches("ralph.prd1", "ralph.*"));
258        assert!(!topic_matches("task.42", "agent.*"));
259        assert!(topic_matches("agent.123.events", "agent.123.events"));
260    }
261}