1use crate::bus::BusEnvelope;
12use chrono::{DateTime, Utc};
13use serde::Deserialize;
14use std::collections::VecDeque;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use tokio::sync::RwLock;
19use tracing::{debug, info, warn};
20
21const DEFAULT_BUFFER_SIZE: usize = 1_000;
23
24const RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(3);
26
27#[derive(Debug)]
34pub struct BusBridge {
35 buffer: Arc<RwLock<VecDeque<BusEnvelope>>>,
37 connected: Arc<AtomicBool>,
39 total_received: Arc<AtomicU64>,
41 bus_url: String,
43 auth_token: Option<String>,
45 capacity: usize,
47}
48
49impl BusBridge {
50 pub fn new(bus_url: String) -> Self {
52 Self::with_auth(bus_url, None)
53 }
54
55 pub fn with_auth(bus_url: String, auth_token: Option<String>) -> Self {
57 Self {
58 buffer: Arc::new(RwLock::new(VecDeque::with_capacity(DEFAULT_BUFFER_SIZE))),
59 connected: Arc::new(AtomicBool::new(false)),
60 total_received: Arc::new(AtomicU64::new(0)),
61 bus_url,
62 auth_token,
63 capacity: DEFAULT_BUFFER_SIZE,
64 }
65 }
66
67 pub fn spawn(self) -> Arc<Self> {
71 let bridge = Arc::new(self);
72 let bg = Arc::clone(&bridge);
73 tokio::spawn(async move {
74 bg.reader_loop().await;
75 });
76 bridge
77 }
78
79 pub async fn recent_events(
81 &self,
82 topic_filter: Option<&str>,
83 limit: usize,
84 since: Option<DateTime<Utc>>,
85 ) -> Vec<BusEnvelope> {
86 let buf = self.buffer.read().await;
87 buf.iter()
88 .rev()
89 .filter(|env| {
90 if let Some(filter) = topic_filter {
91 topic_matches(&env.topic, filter)
92 } else {
93 true
94 }
95 })
96 .filter(|env| {
97 if let Some(ts) = since {
98 env.timestamp >= ts
99 } else {
100 true
101 }
102 })
103 .take(limit)
104 .cloned()
105 .collect::<Vec<_>>()
106 .into_iter()
107 .rev() .collect()
109 }
110
111 pub fn status(&self) -> BusBridgeStatus {
113 BusBridgeStatus {
114 connected: self.connected.load(Ordering::Relaxed),
115 total_received: self.total_received.load(Ordering::Relaxed),
116 bus_url: self.bus_url.clone(),
117 buffer_capacity: self.capacity,
118 }
119 }
120
121 pub async fn buffer_len(&self) -> usize {
123 self.buffer.read().await.len()
124 }
125
126 async fn reader_loop(&self) {
131 loop {
132 info!(url = %self.bus_url, "BusBridge: connecting to bus SSE stream");
133 match self.read_sse_stream().await {
134 Ok(()) => {
135 info!("BusBridge: SSE stream closed normally");
136 }
137 Err(e) => {
138 warn!(error = %e, "BusBridge: SSE stream error, reconnecting");
139 }
140 }
141 self.connected.store(false, Ordering::Relaxed);
142 tokio::time::sleep(RECONNECT_DELAY).await;
143 }
144 }
145
146 async fn read_sse_stream(&self) -> anyhow::Result<()> {
148 let client = reqwest::Client::new();
149 let mut req = client
150 .get(&self.bus_url)
151 .header("Accept", "text/event-stream");
152 if let Some(token) = self
153 .auth_token
154 .as_deref()
155 .filter(|value| !value.trim().is_empty())
156 {
157 req = req.bearer_auth(token);
158 }
159 let resp = req.send().await?;
160
161 if !resp.status().is_success() {
162 anyhow::bail!("SSE endpoint returned {}", resp.status());
163 }
164
165 self.connected.store(true, Ordering::Relaxed);
166 info!("BusBridge: connected to SSE stream");
167
168 let mut event_type = String::new();
173 let mut data_buf = String::new();
174
175 use futures::StreamExt;
176 let mut byte_stream = resp.bytes_stream();
177
178 let mut line_buf = String::new();
180
181 while let Some(chunk) = byte_stream.next().await {
182 let chunk = chunk?;
183 let text = String::from_utf8_lossy(&chunk);
184
185 for ch in text.chars() {
186 if ch == '\n' {
187 let line = std::mem::take(&mut line_buf);
188 self.process_sse_line(&line, &mut event_type, &mut data_buf)
189 .await;
190 } else {
191 line_buf.push(ch);
192 }
193 }
194 }
195
196 Ok(())
197 }
198
199 async fn process_sse_line(&self, line: &str, event_type: &mut String, data_buf: &mut String) {
201 if line.is_empty() {
202 if event_type == "bus" && !data_buf.is_empty() {
204 match serde_json::from_str::<BusEnvelope>(data_buf) {
205 Ok(envelope) => {
206 self.push_envelope(envelope).await;
207 }
208 Err(e) => {
209 debug!(error = %e, "BusBridge: failed to parse bus envelope");
210 }
211 }
212 }
213 event_type.clear();
214 data_buf.clear();
215 } else if let Some(rest) = line.strip_prefix("event:") {
216 *event_type = rest.trim().to_string();
217 } else if let Some(rest) = line.strip_prefix("data:") {
218 if !data_buf.is_empty() {
219 data_buf.push('\n');
220 }
221 data_buf.push_str(rest.trim());
222 }
223 }
225
226 async fn push_envelope(&self, envelope: BusEnvelope) {
228 let mut buf = self.buffer.write().await;
229 if buf.len() >= self.capacity {
230 buf.pop_front();
231 }
232 buf.push_back(envelope);
233 drop(buf);
234 self.total_received.fetch_add(1, Ordering::Relaxed);
235 }
236}
237
238pub async fn resolve_worker_bus_url(
240 control_plane_url: &str,
241 worker_id: &str,
242 token: Option<&str>,
243) -> anyhow::Result<String> {
244 let worker_url = format!(
245 "{}/v1/agent/workers/{}",
246 control_plane_url.trim_end_matches('/'),
247 worker_id
248 );
249
250 let client = reqwest::Client::new();
251 let mut req = client.get(worker_url);
252 if let Some(token) = token.filter(|value| !value.trim().is_empty()) {
253 req = req.bearer_auth(token);
254 }
255
256 let worker: serde_json::Value = req.send().await?.error_for_status()?.json().await?;
257
258 let has_bus_interface = worker
259 .get("interfaces")
260 .and_then(|value| value.get("bus"))
261 .and_then(|value| value.get("stream_url"))
262 .and_then(|value| value.as_str())
263 .is_some();
264 let has_http_interface = worker
265 .get("interfaces")
266 .and_then(|value| value.get("http"))
267 .and_then(|value| value.get("base_url"))
268 .and_then(|value| value.as_str())
269 .is_some();
270
271 if has_bus_interface || has_http_interface {
272 return Ok(format!(
273 "{}/v1/agent/workers/{}/bus/stream",
274 control_plane_url.trim_end_matches('/'),
275 worker_id
276 ));
277 }
278
279 anyhow::bail!(
280 "Worker '{}' does not advertise a first-class bus interface",
281 worker_id
282 )
283}
284
285#[derive(Debug, Deserialize)]
286struct WorkspaceSummary {
287 id: String,
288 path: Option<String>,
289}
290
291#[derive(Debug, Deserialize)]
292struct WorkspaceDetails {
293 worker_id: Option<String>,
294}
295
296pub async fn resolve_worker_bus_url_for_workspace(
297 control_plane_url: &str,
298 workspace_id: &str,
299 token: Option<&str>,
300) -> anyhow::Result<String> {
301 let workspace_url = format!(
302 "{}/v1/agent/workspaces/{}",
303 control_plane_url.trim_end_matches('/'),
304 urlencoding::encode(workspace_id)
305 );
306
307 let client = reqwest::Client::new();
308 let mut req = client.get(workspace_url);
309 if let Some(token) = token.filter(|value| !value.trim().is_empty()) {
310 req = req.bearer_auth(token);
311 }
312
313 let workspace: WorkspaceDetails = req.send().await?.error_for_status()?.json().await?;
314 let worker_id = workspace
315 .worker_id
316 .as_deref()
317 .map(str::trim)
318 .filter(|value| !value.is_empty())
319 .ok_or_else(|| {
320 anyhow::anyhow!(
321 "Workspace '{}' is not currently assigned to a worker; pass --worker-id or register the workspace on a worker",
322 workspace_id
323 )
324 })?;
325
326 resolve_worker_bus_url(control_plane_url, worker_id, token).await
327}
328
329pub async fn resolve_workspace_id_from_path(
330 control_plane_url: &str,
331 workspace_root: &Path,
332 token: Option<&str>,
333) -> anyhow::Result<Option<String>> {
334 let workspace_root = normalize_local_path(workspace_root)?;
335 let workspace_root = workspace_root.to_string_lossy().to_string();
336 let workspaces_url = format!(
337 "{}/v1/agent/workspaces",
338 control_plane_url.trim_end_matches('/')
339 );
340
341 let client = reqwest::Client::new();
342 let mut req = client.get(workspaces_url);
343 if let Some(token) = token.filter(|value| !value.trim().is_empty()) {
344 req = req.bearer_auth(token);
345 }
346
347 let workspaces: Vec<WorkspaceSummary> = req.send().await?.error_for_status()?.json().await?;
348 Ok(best_workspace_match(&workspace_root, &workspaces).map(|workspace| workspace.id.clone()))
349}
350
351pub async fn resolve_default_worker_bus_url(
354 control_plane_url: &str,
355 token: Option<&str>,
356) -> anyhow::Result<String> {
357 let workers_url = format!(
358 "{}/v1/agent/workers",
359 control_plane_url.trim_end_matches('/')
360 );
361
362 let client = reqwest::Client::new();
363 let mut req = client.get(workers_url);
364 if let Some(token) = token.filter(|value| !value.trim().is_empty()) {
365 req = req.bearer_auth(token);
366 }
367
368 let workers: Vec<serde_json::Value> = req.send().await?.error_for_status()?.json().await?;
369
370 let candidates = workers
371 .into_iter()
372 .filter(|worker| {
373 let status = worker
374 .get("status")
375 .and_then(|value| value.as_str())
376 .unwrap_or_default();
377 status == "active"
378 && worker
379 .get("interfaces")
380 .and_then(|value| value.as_object())
381 .map(|value| !value.is_empty())
382 .unwrap_or(false)
383 })
384 .collect::<Vec<_>>();
385
386 match candidates.as_slice() {
387 [worker] => {
388 let worker_id = worker
389 .get("worker_id")
390 .and_then(|value| value.as_str())
391 .ok_or_else(|| anyhow::anyhow!("Active worker is missing worker_id"))?;
392 resolve_worker_bus_url(control_plane_url, worker_id, token).await
393 }
394 [] => anyhow::bail!(
395 "No active workers with first-class interfaces were found; deploy/register a worker or provide --worker-id"
396 ),
397 workers => {
398 let worker_ids = workers
399 .iter()
400 .filter_map(|worker| worker.get("worker_id").and_then(|value| value.as_str()))
401 .collect::<Vec<_>>()
402 .join(", ");
403 anyhow::bail!(
404 "Multiple active workers are registered ({worker_ids}); provide --worker-id to choose one"
405 )
406 }
407 }
408}
409
410#[derive(Debug, Clone, serde::Serialize)]
412pub struct BusBridgeStatus {
413 pub connected: bool,
414 pub total_received: u64,
415 pub bus_url: String,
416 pub buffer_capacity: usize,
417}
418
419fn topic_matches(topic: &str, pattern: &str) -> bool {
423 if pattern == "*" {
424 return true;
425 }
426 if let Some(prefix) = pattern.strip_suffix(".*") {
427 return topic.starts_with(prefix);
428 }
429 if let Some(suffix) = pattern.strip_prefix(".*") {
430 return topic.ends_with(suffix);
431 }
432 topic == pattern
433}
434
435fn normalize_local_path(path: &Path) -> anyhow::Result<PathBuf> {
436 if path.is_absolute() {
437 return Ok(path.to_path_buf());
438 }
439
440 Ok(std::env::current_dir()?.join(path))
441}
442
443fn best_workspace_match<'a>(
444 workspace_root: &str,
445 workspaces: &'a [WorkspaceSummary],
446) -> Option<&'a WorkspaceSummary> {
447 let direct = workspaces
448 .iter()
449 .filter_map(|workspace| {
450 let path = workspace.path.as_deref()?;
451 if workspace_root == path || workspace_root.starts_with(&format!("{}/", path)) {
452 Some((path.len(), workspace))
453 } else {
454 None
455 }
456 })
457 .max_by_key(|(path_len, _)| *path_len)
458 .map(|(_, workspace)| workspace);
459
460 if direct.is_some() {
461 return direct;
462 }
463
464 let mut scored: Vec<(usize, &WorkspaceSummary)> = workspaces
465 .iter()
466 .filter_map(|workspace| {
467 let path = workspace.path.as_deref()?;
468 let score = shared_path_suffix_score(workspace_root, path);
469 (score > 0).then_some((score, workspace))
470 })
471 .collect();
472
473 scored.sort_by(|left, right| right.0.cmp(&left.0));
474
475 match scored.as_slice() {
476 [] => None,
477 [(score, workspace), ..] => {
478 let is_unique_best = scored
479 .get(1)
480 .map(|(next_score, _)| next_score < score)
481 .unwrap_or(true);
482 if is_unique_best {
483 Some(*workspace)
484 } else {
485 None
486 }
487 }
488 }
489}
490
491fn shared_path_suffix_score(left: &str, right: &str) -> usize {
492 let left_parts: Vec<&str> = left.split('/').filter(|part| !part.is_empty()).collect();
493 let right_parts: Vec<&str> = right.split('/').filter(|part| !part.is_empty()).collect();
494
495 let mut score = 0usize;
496 for (left_part, right_part) in left_parts.iter().rev().zip(right_parts.iter().rev()) {
497 if left_part == right_part {
498 score += 1;
499 } else {
500 break;
501 }
502 }
503
504 score
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510
511 #[test]
512 fn test_topic_matches() {
513 assert!(topic_matches("agent.123.events", "*"));
514 assert!(topic_matches("agent.123.events", "agent.*"));
515 assert!(topic_matches("ralph.prd1", "ralph.*"));
516 assert!(!topic_matches("task.42", "agent.*"));
517 assert!(topic_matches("agent.123.events", "agent.123.events"));
518 }
519}