Skip to main content

codetether_agent/mcp/
bus_bridge.rs

1//! MCP-to-Bus bridge
2//!
3//! Connects to the HTTP server's `/v1/bus/stream` SSE endpoint and buffers
4//! recent [`BusEnvelope`]s in a ring buffer.  The MCP server exposes this
5//! data through tools (`bus_events`, `bus_status`) and resources
6//! (`codetether://bus/events/recent`).
7//!
8//! The bridge runs as a background tokio task and reconnects automatically
9//! on transient failures.
10
11use crate::bus::BusEnvelope;
12use chrono::{DateTime, Utc};
13use serde::Deserialize;
14use std::collections::VecDeque;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use tokio::sync::RwLock;
19use tracing::{debug, info, warn};
20
21/// Maximum number of envelopes kept in the ring buffer.
22const DEFAULT_BUFFER_SIZE: usize = 1_000;
23
24/// Reconnect delay after a transient SSE failure.
25const RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(3);
26
27// ─── BusBridge ───────────────────────────────────────────────────────────
28
29/// A read-only bridge from the HTTP bus SSE stream into the MCP process.
30///
31/// Call [`BusBridge::spawn`] to start the background reader, then query via
32/// [`BusBridge::recent_events`] or [`BusBridge::status`].
33#[derive(Debug)]
34pub struct BusBridge {
35    /// Ring buffer of recent envelopes (newest last).
36    buffer: Arc<RwLock<VecDeque<BusEnvelope>>>,
37    /// Whether the SSE reader is currently connected.
38    connected: Arc<AtomicBool>,
39    /// Total number of envelopes received since start.
40    total_received: Arc<AtomicU64>,
41    /// URL we connect to.
42    bus_url: String,
43    /// Optional bearer token for authenticated bus endpoints.
44    auth_token: Option<String>,
45    /// Max buffer capacity.
46    capacity: usize,
47}
48
49impl BusBridge {
50    /// Create a new bridge (does **not** start the background task).
51    pub fn new(bus_url: String) -> Self {
52        Self::with_auth(bus_url, None)
53    }
54
55    /// Create a new bridge with an optional bearer token.
56    pub fn with_auth(bus_url: String, auth_token: Option<String>) -> Self {
57        Self {
58            buffer: Arc::new(RwLock::new(VecDeque::with_capacity(DEFAULT_BUFFER_SIZE))),
59            connected: Arc::new(AtomicBool::new(false)),
60            total_received: Arc::new(AtomicU64::new(0)),
61            bus_url,
62            auth_token,
63            capacity: DEFAULT_BUFFER_SIZE,
64        }
65    }
66
67    /// Spawn the SSE reader as a background tokio task.
68    ///
69    /// Returns `Self` wrapped in an `Arc` for sharing with tool handlers.
70    pub fn spawn(self) -> Arc<Self> {
71        let bridge = Arc::new(self);
72        let bg = Arc::clone(&bridge);
73        tokio::spawn(async move {
74            bg.reader_loop().await;
75        });
76        bridge
77    }
78
79    /// Query recent events with optional topic filter and limit.
80    pub async fn recent_events(
81        &self,
82        topic_filter: Option<&str>,
83        limit: usize,
84        since: Option<DateTime<Utc>>,
85    ) -> Vec<BusEnvelope> {
86        let buf = self.buffer.read().await;
87        buf.iter()
88            .rev()
89            .filter(|env| {
90                if let Some(filter) = topic_filter {
91                    topic_matches(&env.topic, filter)
92                } else {
93                    true
94                }
95            })
96            .filter(|env| {
97                if let Some(ts) = since {
98                    env.timestamp >= ts
99                } else {
100                    true
101                }
102            })
103            .take(limit)
104            .cloned()
105            .collect::<Vec<_>>()
106            .into_iter()
107            .rev() // restore chronological order
108            .collect()
109    }
110
111    /// Current bridge status summary (JSON-friendly).
112    pub fn status(&self) -> BusBridgeStatus {
113        BusBridgeStatus {
114            connected: self.connected.load(Ordering::Relaxed),
115            total_received: self.total_received.load(Ordering::Relaxed),
116            bus_url: self.bus_url.clone(),
117            buffer_capacity: self.capacity,
118        }
119    }
120
121    /// Buffer size (number of envelopes currently held).
122    pub async fn buffer_len(&self) -> usize {
123        self.buffer.read().await.len()
124    }
125
126    // ── internal ──────────────────────────────────────────────────────
127
128    /// Background loop: connect to SSE, read envelopes, push to buffer.
129    /// Reconnects on failure.
130    async fn reader_loop(&self) {
131        loop {
132            info!(url = %self.bus_url, "BusBridge: connecting to bus SSE stream");
133            match self.read_sse_stream().await {
134                Ok(()) => {
135                    info!("BusBridge: SSE stream closed normally");
136                }
137                Err(e) => {
138                    warn!(error = %e, "BusBridge: SSE stream error, reconnecting");
139                }
140            }
141            self.connected.store(false, Ordering::Relaxed);
142            tokio::time::sleep(RECONNECT_DELAY).await;
143        }
144    }
145
146    /// Single SSE connection attempt.  Reads until the stream closes or errors.
147    async fn read_sse_stream(&self) -> anyhow::Result<()> {
148        let client = reqwest::Client::new();
149        let mut req = client
150            .get(&self.bus_url)
151            .header("Accept", "text/event-stream");
152        if let Some(token) = self
153            .auth_token
154            .as_deref()
155            .filter(|value| !value.trim().is_empty())
156        {
157            req = req.bearer_auth(token);
158        }
159        let resp = req.send().await?;
160
161        if !resp.status().is_success() {
162            anyhow::bail!("SSE endpoint returned {}", resp.status());
163        }
164
165        self.connected.store(true, Ordering::Relaxed);
166        info!("BusBridge: connected to SSE stream");
167
168        // Read line-by-line.  SSE format:
169        //   event: <type>\n
170        //   data: <json>\n
171        //   \n
172        let mut event_type = String::new();
173        let mut data_buf = String::new();
174
175        use futures::StreamExt;
176        let mut byte_stream = resp.bytes_stream();
177
178        // Accumulate raw bytes into lines
179        let mut line_buf = String::new();
180
181        while let Some(chunk) = byte_stream.next().await {
182            let chunk = chunk?;
183            let text = String::from_utf8_lossy(&chunk);
184
185            for ch in text.chars() {
186                if ch == '\n' {
187                    let line = std::mem::take(&mut line_buf);
188                    self.process_sse_line(&line, &mut event_type, &mut data_buf)
189                        .await;
190                } else {
191                    line_buf.push(ch);
192                }
193            }
194        }
195
196        Ok(())
197    }
198
199    /// Process a single SSE text line.
200    async fn process_sse_line(&self, line: &str, event_type: &mut String, data_buf: &mut String) {
201        if line.is_empty() {
202            // Empty line = end of event
203            if event_type == "bus" && !data_buf.is_empty() {
204                match serde_json::from_str::<BusEnvelope>(data_buf) {
205                    Ok(envelope) => {
206                        self.push_envelope(envelope).await;
207                    }
208                    Err(e) => {
209                        debug!(error = %e, "BusBridge: failed to parse bus envelope");
210                    }
211                }
212            }
213            event_type.clear();
214            data_buf.clear();
215        } else if let Some(rest) = line.strip_prefix("event:") {
216            *event_type = rest.trim().to_string();
217        } else if let Some(rest) = line.strip_prefix("data:") {
218            if !data_buf.is_empty() {
219                data_buf.push('\n');
220            }
221            data_buf.push_str(rest.trim());
222        }
223        // Ignore comment lines (`:`) and other prefixes
224    }
225
226    /// Push an envelope into the ring buffer, evicting oldest if full.
227    async fn push_envelope(&self, envelope: BusEnvelope) {
228        let mut buf = self.buffer.write().await;
229        if buf.len() >= self.capacity {
230            buf.pop_front();
231        }
232        buf.push_back(envelope);
233        drop(buf);
234        self.total_received.fetch_add(1, Ordering::Relaxed);
235    }
236}
237
238/// Resolve a worker's first-class bus connection via the control plane.
239pub async fn resolve_worker_bus_url(
240    control_plane_url: &str,
241    worker_id: &str,
242    token: Option<&str>,
243) -> anyhow::Result<String> {
244    let worker_url = format!(
245        "{}/v1/agent/workers/{}",
246        control_plane_url.trim_end_matches('/'),
247        worker_id
248    );
249
250    let client = reqwest::Client::new();
251    let mut req = client.get(worker_url);
252    if let Some(token) = token.filter(|value| !value.trim().is_empty()) {
253        req = req.bearer_auth(token);
254    }
255
256    let worker: serde_json::Value = req.send().await?.error_for_status()?.json().await?;
257
258    let has_bus_interface = worker
259        .get("interfaces")
260        .and_then(|value| value.get("bus"))
261        .and_then(|value| value.get("stream_url"))
262        .and_then(|value| value.as_str())
263        .is_some();
264    let has_http_interface = worker
265        .get("interfaces")
266        .and_then(|value| value.get("http"))
267        .and_then(|value| value.get("base_url"))
268        .and_then(|value| value.as_str())
269        .is_some();
270
271    if has_bus_interface || has_http_interface {
272        return Ok(format!(
273            "{}/v1/agent/workers/{}/bus/stream",
274            control_plane_url.trim_end_matches('/'),
275            worker_id
276        ));
277    }
278
279    anyhow::bail!(
280        "Worker '{}' does not advertise a first-class bus interface",
281        worker_id
282    )
283}
284
285#[derive(Debug, Deserialize)]
286struct WorkspaceSummary {
287    id: String,
288    path: Option<String>,
289}
290
291#[derive(Debug, Deserialize)]
292struct WorkspaceDetails {
293    worker_id: Option<String>,
294}
295
296pub async fn resolve_worker_bus_url_for_workspace(
297    control_plane_url: &str,
298    workspace_id: &str,
299    token: Option<&str>,
300) -> anyhow::Result<String> {
301    let workspace_url = format!(
302        "{}/v1/agent/workspaces/{}",
303        control_plane_url.trim_end_matches('/'),
304        urlencoding::encode(workspace_id)
305    );
306
307    let client = reqwest::Client::new();
308    let mut req = client.get(workspace_url);
309    if let Some(token) = token.filter(|value| !value.trim().is_empty()) {
310        req = req.bearer_auth(token);
311    }
312
313    let workspace: WorkspaceDetails = req.send().await?.error_for_status()?.json().await?;
314    let worker_id = workspace
315        .worker_id
316        .as_deref()
317        .map(str::trim)
318        .filter(|value| !value.is_empty())
319        .ok_or_else(|| {
320            anyhow::anyhow!(
321                "Workspace '{}' is not currently assigned to a worker; pass --worker-id or register the workspace on a worker",
322                workspace_id
323            )
324        })?;
325
326    resolve_worker_bus_url(control_plane_url, worker_id, token).await
327}
328
329pub async fn resolve_workspace_id_from_path(
330    control_plane_url: &str,
331    workspace_root: &Path,
332    token: Option<&str>,
333) -> anyhow::Result<Option<String>> {
334    let workspace_root = normalize_local_path(workspace_root)?;
335    let workspace_root = workspace_root.to_string_lossy().to_string();
336    let workspaces_url = format!(
337        "{}/v1/agent/workspaces",
338        control_plane_url.trim_end_matches('/')
339    );
340
341    let client = reqwest::Client::new();
342    let mut req = client.get(workspaces_url);
343    if let Some(token) = token.filter(|value| !value.trim().is_empty()) {
344        req = req.bearer_auth(token);
345    }
346
347    let workspaces: Vec<WorkspaceSummary> = req.send().await?.error_for_status()?.json().await?;
348    Ok(best_workspace_match(&workspace_root, &workspaces).map(|workspace| workspace.id.clone()))
349}
350
351/// Resolve the default worker bus connection when the control plane has exactly
352/// one active worker advertising a first-class bus/http interface.
353pub async fn resolve_default_worker_bus_url(
354    control_plane_url: &str,
355    token: Option<&str>,
356) -> anyhow::Result<String> {
357    let workers_url = format!(
358        "{}/v1/agent/workers",
359        control_plane_url.trim_end_matches('/')
360    );
361
362    let client = reqwest::Client::new();
363    let mut req = client.get(workers_url);
364    if let Some(token) = token.filter(|value| !value.trim().is_empty()) {
365        req = req.bearer_auth(token);
366    }
367
368    let workers: Vec<serde_json::Value> = req.send().await?.error_for_status()?.json().await?;
369
370    let candidates = workers
371        .into_iter()
372        .filter(|worker| {
373            let status = worker
374                .get("status")
375                .and_then(|value| value.as_str())
376                .unwrap_or_default();
377            status == "active"
378                && worker
379                    .get("interfaces")
380                    .and_then(|value| value.as_object())
381                    .map(|value| !value.is_empty())
382                    .unwrap_or(false)
383        })
384        .collect::<Vec<_>>();
385
386    match candidates.as_slice() {
387        [worker] => {
388            let worker_id = worker
389                .get("worker_id")
390                .and_then(|value| value.as_str())
391                .ok_or_else(|| anyhow::anyhow!("Active worker is missing worker_id"))?;
392            resolve_worker_bus_url(control_plane_url, worker_id, token).await
393        }
394        [] => anyhow::bail!(
395            "No active workers with first-class interfaces were found; deploy/register a worker or provide --worker-id"
396        ),
397        workers => {
398            let worker_ids = workers
399                .iter()
400                .filter_map(|worker| worker.get("worker_id").and_then(|value| value.as_str()))
401                .collect::<Vec<_>>()
402                .join(", ");
403            anyhow::bail!(
404                "Multiple active workers are registered ({worker_ids}); provide --worker-id to choose one"
405            )
406        }
407    }
408}
409
410/// Status snapshot returned by [`BusBridge::status`].
411#[derive(Debug, Clone, serde::Serialize)]
412pub struct BusBridgeStatus {
413    pub connected: bool,
414    pub total_received: u64,
415    pub bus_url: String,
416    pub buffer_capacity: usize,
417}
418
419// ─── Helpers ─────────────────────────────────────────────────────────────
420
421/// Topic pattern matching (mirrors `server/mod.rs::topic_matches`).
422fn topic_matches(topic: &str, pattern: &str) -> bool {
423    if pattern == "*" {
424        return true;
425    }
426    if let Some(prefix) = pattern.strip_suffix(".*") {
427        return topic.starts_with(prefix);
428    }
429    if let Some(suffix) = pattern.strip_prefix(".*") {
430        return topic.ends_with(suffix);
431    }
432    topic == pattern
433}
434
435fn normalize_local_path(path: &Path) -> anyhow::Result<PathBuf> {
436    if path.is_absolute() {
437        return Ok(path.to_path_buf());
438    }
439
440    Ok(std::env::current_dir()?.join(path))
441}
442
443fn best_workspace_match<'a>(
444    workspace_root: &str,
445    workspaces: &'a [WorkspaceSummary],
446) -> Option<&'a WorkspaceSummary> {
447    let direct = workspaces
448        .iter()
449        .filter_map(|workspace| {
450            let path = workspace.path.as_deref()?;
451            if workspace_root == path || workspace_root.starts_with(&format!("{}/", path)) {
452                Some((path.len(), workspace))
453            } else {
454                None
455            }
456        })
457        .max_by_key(|(path_len, _)| *path_len)
458        .map(|(_, workspace)| workspace);
459
460    if direct.is_some() {
461        return direct;
462    }
463
464    let mut scored: Vec<(usize, &WorkspaceSummary)> = workspaces
465        .iter()
466        .filter_map(|workspace| {
467            let path = workspace.path.as_deref()?;
468            let score = shared_path_suffix_score(workspace_root, path);
469            (score > 0).then_some((score, workspace))
470        })
471        .collect();
472
473    scored.sort_by(|left, right| right.0.cmp(&left.0));
474
475    match scored.as_slice() {
476        [] => None,
477        [(score, workspace), ..] => {
478            let is_unique_best = scored
479                .get(1)
480                .map(|(next_score, _)| next_score < score)
481                .unwrap_or(true);
482            if is_unique_best {
483                Some(*workspace)
484            } else {
485                None
486            }
487        }
488    }
489}
490
491fn shared_path_suffix_score(left: &str, right: &str) -> usize {
492    let left_parts: Vec<&str> = left.split('/').filter(|part| !part.is_empty()).collect();
493    let right_parts: Vec<&str> = right.split('/').filter(|part| !part.is_empty()).collect();
494
495    let mut score = 0usize;
496    for (left_part, right_part) in left_parts.iter().rev().zip(right_parts.iter().rev()) {
497        if left_part == right_part {
498            score += 1;
499        } else {
500            break;
501        }
502    }
503
504    score
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510
511    #[test]
512    fn test_topic_matches() {
513        assert!(topic_matches("agent.123.events", "*"));
514        assert!(topic_matches("agent.123.events", "agent.*"));
515        assert!(topic_matches("ralph.prd1", "ralph.*"));
516        assert!(!topic_matches("task.42", "agent.*"));
517        assert!(topic_matches("agent.123.events", "agent.123.events"));
518    }
519}