use crate::bus::BusEnvelope;
use chrono::{DateTime, Utc};
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
const DEFAULT_BUFFER_SIZE: usize = 1_000;
const RECONNECT_DELAY: std::time::Duration = std::time::Duration::from_secs(3);
#[derive(Debug)]
pub struct BusBridge {
buffer: Arc<RwLock<VecDeque<BusEnvelope>>>,
connected: Arc<AtomicBool>,
total_received: Arc<AtomicU64>,
bus_url: String,
capacity: usize,
}
impl BusBridge {
pub fn new(bus_url: String) -> Self {
Self {
buffer: Arc::new(RwLock::new(VecDeque::with_capacity(DEFAULT_BUFFER_SIZE))),
connected: Arc::new(AtomicBool::new(false)),
total_received: Arc::new(AtomicU64::new(0)),
bus_url,
capacity: DEFAULT_BUFFER_SIZE,
}
}
pub fn spawn(self) -> Arc<Self> {
let bridge = Arc::new(self);
let bg = Arc::clone(&bridge);
tokio::spawn(async move {
bg.reader_loop().await;
});
bridge
}
pub async fn recent_events(
&self,
topic_filter: Option<&str>,
limit: usize,
since: Option<DateTime<Utc>>,
) -> Vec<BusEnvelope> {
let buf = self.buffer.read().await;
buf.iter()
.rev()
.filter(|env| {
if let Some(filter) = topic_filter {
topic_matches(&env.topic, filter)
} else {
true
}
})
.filter(|env| {
if let Some(ts) = since {
env.timestamp >= ts
} else {
true
}
})
.take(limit)
.cloned()
.collect::<Vec<_>>()
.into_iter()
.rev() .collect()
}
pub fn status(&self) -> BusBridgeStatus {
BusBridgeStatus {
connected: self.connected.load(Ordering::Relaxed),
total_received: self.total_received.load(Ordering::Relaxed),
bus_url: self.bus_url.clone(),
buffer_capacity: self.capacity,
}
}
pub async fn buffer_len(&self) -> usize {
self.buffer.read().await.len()
}
async fn reader_loop(&self) {
loop {
info!(url = %self.bus_url, "BusBridge: connecting to bus SSE stream");
match self.read_sse_stream().await {
Ok(()) => {
info!("BusBridge: SSE stream closed normally");
}
Err(e) => {
warn!(error = %e, "BusBridge: SSE stream error, reconnecting");
}
}
self.connected.store(false, Ordering::Relaxed);
tokio::time::sleep(RECONNECT_DELAY).await;
}
}
async fn read_sse_stream(&self) -> anyhow::Result<()> {
let client = reqwest::Client::new();
let resp = client
.get(&self.bus_url)
.header("Accept", "text/event-stream")
.send()
.await?;
if !resp.status().is_success() {
anyhow::bail!("SSE endpoint returned {}", resp.status());
}
self.connected.store(true, Ordering::Relaxed);
info!("BusBridge: connected to SSE stream");
let mut event_type = String::new();
let mut data_buf = String::new();
use futures::StreamExt;
let mut byte_stream = resp.bytes_stream();
let mut line_buf = String::new();
while let Some(chunk) = byte_stream.next().await {
let chunk = chunk?;
let text = String::from_utf8_lossy(&chunk);
for ch in text.chars() {
if ch == '\n' {
let line = std::mem::take(&mut line_buf);
self.process_sse_line(&line, &mut event_type, &mut data_buf)
.await;
} else {
line_buf.push(ch);
}
}
}
Ok(())
}
async fn process_sse_line(&self, line: &str, event_type: &mut String, data_buf: &mut String) {
if line.is_empty() {
if event_type == "bus" && !data_buf.is_empty() {
match serde_json::from_str::<BusEnvelope>(data_buf) {
Ok(envelope) => {
self.push_envelope(envelope).await;
}
Err(e) => {
debug!(error = %e, "BusBridge: failed to parse bus envelope");
}
}
}
event_type.clear();
data_buf.clear();
} else if let Some(rest) = line.strip_prefix("event:") {
*event_type = rest.trim().to_string();
} else if let Some(rest) = line.strip_prefix("data:") {
if !data_buf.is_empty() {
data_buf.push('\n');
}
data_buf.push_str(rest.trim());
}
}
async fn push_envelope(&self, envelope: BusEnvelope) {
let mut buf = self.buffer.write().await;
if buf.len() >= self.capacity {
buf.pop_front();
}
buf.push_back(envelope);
drop(buf);
self.total_received.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct BusBridgeStatus {
pub connected: bool,
pub total_received: u64,
pub bus_url: String,
pub buffer_capacity: usize,
}
fn topic_matches(topic: &str, pattern: &str) -> bool {
if pattern == "*" {
return true;
}
if pattern.ends_with(".*") {
let prefix = &pattern[..pattern.len() - 2];
return topic.starts_with(prefix);
}
if pattern.starts_with(".*") {
let suffix = &pattern[2..];
return topic.ends_with(suffix);
}
topic == pattern
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_topic_matches() {
assert!(topic_matches("agent.123.events", "*"));
assert!(topic_matches("agent.123.events", "agent.*"));
assert!(topic_matches("ralph.prd1", "ralph.*"));
assert!(!topic_matches("task.42", "agent.*"));
assert!(topic_matches("agent.123.events", "agent.123.events"));
}
}