use super::AppState;
use axum::{
Json,
extract::State,
http::{HeaderMap, StatusCode, header},
response::{
IntoResponse,
sse::{Event, KeepAlive, Sse},
},
};
use std::collections::VecDeque;
use std::convert::Infallible;
use std::sync::{Arc, Mutex};
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
pub struct EventBuffer {
inner: Mutex<VecDeque<serde_json::Value>>,
capacity: usize,
}
impl EventBuffer {
pub fn new(capacity: usize) -> Self {
Self {
inner: Mutex::new(VecDeque::with_capacity(capacity)),
capacity,
}
}
pub fn push(&self, event: serde_json::Value) {
let mut buf = self.inner.lock().unwrap();
if buf.len() == self.capacity {
buf.pop_front();
}
buf.push_back(event);
}
pub fn snapshot(&self) -> Vec<serde_json::Value> {
self.inner.lock().unwrap().iter().cloned().collect()
}
}
pub async fn handle_sse_events(
State(state): State<AppState>,
headers: HeaderMap,
) -> impl IntoResponse {
if state.pairing.require_pairing() {
let token = headers
.get(header::AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.and_then(|auth| auth.strip_prefix("Bearer "))
.unwrap_or("");
if !state.pairing.is_authenticated(token) {
return (
StatusCode::UNAUTHORIZED,
"Unauthorized — provide Authorization: Bearer <token>",
)
.into_response();
}
}
let rx = state.event_tx.subscribe();
let stream = BroadcastStream::new(rx).filter_map(
|result: Result<
serde_json::Value,
tokio_stream::wrappers::errors::BroadcastStreamRecvError,
>| {
match result {
Ok(value) => Some(Ok::<_, Infallible>(
Event::default().data(value.to_string()),
)),
Err(_) => None, }
},
);
Sse::new(stream)
.keep_alive(KeepAlive::default())
.into_response()
}
pub async fn handle_events_history(
State(state): State<AppState>,
headers: HeaderMap,
) -> impl IntoResponse {
if let Err(e) = super::api::require_auth(&state, &headers) {
return e.into_response();
}
let events = state.event_buffer.snapshot();
Json(serde_json::json!({ "events": events })).into_response()
}
pub struct BroadcastObserver {
inner: Box<dyn crate::observability::Observer>,
tx: tokio::sync::broadcast::Sender<serde_json::Value>,
buffer: Arc<EventBuffer>,
}
impl BroadcastObserver {
pub fn new(
inner: Box<dyn crate::observability::Observer>,
tx: tokio::sync::broadcast::Sender<serde_json::Value>,
buffer: Arc<EventBuffer>,
) -> Self {
Self { inner, tx, buffer }
}
pub fn inner(&self) -> &dyn crate::observability::Observer {
self.inner.as_ref()
}
}
impl crate::observability::Observer for BroadcastObserver {
fn record_event(&self, event: &crate::observability::ObserverEvent) {
self.inner.record_event(event);
let json = match event {
crate::observability::ObserverEvent::LlmRequest {
provider, model, ..
} => serde_json::json!({
"type": "llm_request",
"provider": provider,
"model": model,
"timestamp": chrono::Utc::now().to_rfc3339(),
}),
crate::observability::ObserverEvent::ToolCall {
tool,
duration,
success,
} => serde_json::json!({
"type": "tool_call",
"tool": tool,
"duration_ms": duration.as_millis(),
"success": success,
"timestamp": chrono::Utc::now().to_rfc3339(),
}),
crate::observability::ObserverEvent::ToolCallStart { tool, .. } => serde_json::json!({
"type": "tool_call_start",
"tool": tool,
"timestamp": chrono::Utc::now().to_rfc3339(),
}),
crate::observability::ObserverEvent::Error { component, message } => {
serde_json::json!({
"type": "error",
"component": component,
"message": message,
"timestamp": chrono::Utc::now().to_rfc3339(),
})
}
crate::observability::ObserverEvent::AgentStart { provider, model } => {
serde_json::json!({
"type": "agent_start",
"provider": provider,
"model": model,
"timestamp": chrono::Utc::now().to_rfc3339(),
})
}
crate::observability::ObserverEvent::AgentEnd {
provider,
model,
duration,
tokens_used,
cost_usd,
} => serde_json::json!({
"type": "agent_end",
"provider": provider,
"model": model,
"duration_ms": duration.as_millis(),
"tokens_used": tokens_used,
"cost_usd": cost_usd,
"timestamp": chrono::Utc::now().to_rfc3339(),
}),
_ => return, };
self.buffer.push(json.clone());
let _ = self.tx.send(json);
}
fn record_metric(&self, metric: &crate::observability::traits::ObserverMetric) {
self.inner.record_metric(metric);
}
fn flush(&self) {
self.inner.flush();
}
fn name(&self) -> &str {
"broadcast"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}