use std::sync::Arc;
use futures_util::StreamExt;
use serde::Deserialize;
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message;
use crate::commands::status::client::StatusClient;
use crate::commands::status::fetch;
use crate::commands::status::models::{AgentRow, ApprovalResponse, ApprovalsSummary, BudgetRow, RuntimeHealth};
use super::state::EventEntry;
#[derive(Debug)]
pub enum FeedMessage {
StatusUpdate {
runtime: RuntimeHealth,
agents: Vec<AgentRow>,
approvals_summary: ApprovalsSummary,
pending_approvals: Vec<ApprovalResponse>,
budget: BudgetRow,
},
Event(EventEntry),
WsDisconnected,
}
#[derive(Debug, Deserialize)]
struct WsEvent {
#[allow(dead_code)]
id: u64,
event_type: String,
agent_id: String,
payload: serde_json::Value,
timestamp: String,
}
const POLL_INTERVAL_SECS: u64 = 5;
pub fn spawn_rest_poller(api_url: &str, tx: mpsc::UnboundedSender<FeedMessage>) {
let client = StatusClient::new(api_url);
tokio::spawn(async move {
loop {
let snapshot = fetch::fetch_all(&client).await;
let pending_approvals = client
.list_approvals()
.await
.unwrap_or_default()
.into_iter()
.filter(|a| a.status == "pending")
.collect();
let msg = FeedMessage::StatusUpdate {
runtime: snapshot.runtime,
agents: snapshot.agents,
approvals_summary: snapshot.approvals,
pending_approvals,
budget: snapshot.budget,
};
if tx.send(msg).is_err() {
break; }
tokio::time::sleep(tokio::time::Duration::from_secs(POLL_INTERVAL_SECS)).await;
}
});
}
pub fn spawn_ws_listener(api_url: &str, tx: mpsc::UnboundedSender<FeedMessage>) {
let ws_url = build_ws_url(api_url);
let tx = Arc::new(tx);
tokio::spawn(async move {
let (ws_stream, _) = match tokio_tungstenite::connect_async(&ws_url).await {
Ok(conn) => conn,
Err(_) => {
let _ = tx.send(FeedMessage::WsDisconnected);
return;
}
};
let (_write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
let event: WsEvent = match serde_json::from_str(&text) {
Ok(e) => e,
Err(_) => continue,
};
let entry = ws_event_to_entry(&event);
if tx.send(FeedMessage::Event(entry)).is_err() {
break;
}
}
Ok(Message::Close(_)) => break,
Ok(_) => {} Err(_) => break,
}
}
let _ = tx.send(FeedMessage::WsDisconnected);
});
}
fn build_ws_url(api_url: &str) -> String {
let base = api_url
.replacen("https://", "wss://", 1)
.replacen("http://", "ws://", 1);
format!("{base}/api/v1/ws/events")
}
fn ws_event_to_entry(event: &WsEvent) -> EventEntry {
let message = match &event.payload {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
EventEntry {
timestamp: event.timestamp.clone(),
event_type: event.event_type.clone(),
agent_id: event.agent_id.clone(),
message,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_ws_url_http() {
assert_eq!(
build_ws_url("http://localhost:8080"),
"ws://localhost:8080/api/v1/ws/events"
);
}
#[test]
fn build_ws_url_https() {
assert_eq!(
build_ws_url("https://api.example.com"),
"wss://api.example.com/api/v1/ws/events"
);
}
#[test]
fn ws_event_string_payload() {
let event = WsEvent {
id: 1,
event_type: "violation".to_string(),
agent_id: "a1".to_string(),
payload: serde_json::Value::String("denied".to_string()),
timestamp: "2026-04-30T10:00:00Z".to_string(),
};
let entry = ws_event_to_entry(&event);
assert_eq!(entry.message, "denied");
assert_eq!(entry.event_type, "violation");
}
#[test]
fn ws_event_object_payload() {
let event = WsEvent {
id: 2,
event_type: "budget".to_string(),
agent_id: "a2".to_string(),
payload: serde_json::json!({"action": "alert", "amount": 100}),
timestamp: "2026-04-30T11:00:00Z".to_string(),
};
let entry = ws_event_to_entry(&event);
assert!(entry.message.contains("alert"));
assert!(entry.message.contains("100"));
}
}