use crate::sse_proxy::types::{SSEEvent, SSEMessage};
use crate::transport::json_rpc::{JSON_RPC_VERSION, JsonRpcError, JsonRpcResponse, error_codes}; use actix_web::web::Bytes;
use tokio::sync::broadcast;
use tracing;
#[derive(Clone)]
pub struct EventManager {
sender: broadcast::Sender<SSEMessage>,
}
impl EventManager {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
pub fn send_initial_config(&self, message_url: &str, _servers: &serde_json::Value) {
let message = SSEMessage::new("endpoint", message_url, None);
let formatted_message = Self::format_sse_message(&message);
let formatted_str = std::str::from_utf8(&formatted_message)
.unwrap_or("Could not convert SSE message to string");
tracing::debug!(
raw_message = %formatted_str,
event_type = "endpoint",
"Raw SSE message being sent to client"
);
tracing::debug!(
event = "endpoint",
data = %message_url,
"Sending message URL path as data"
);
self.send_sse_message(message, "endpoint");
}
pub fn subscribe(&self) -> broadcast::Receiver<SSEMessage> {
self.sender.subscribe()
}
pub fn send_tool_response(
&self,
request_id: &str,
_server_id: &str, _tool_name: &str, data: serde_json::Value,
) {
let result_value = if let Some(result) = data.get("result") {
result.clone()
} else {
data
};
let id_value = if let Ok(num_id) = request_id.parse::<i64>() {
serde_json::Value::Number(serde_json::Number::from(num_id))
} else {
serde_json::Value::String(request_id.to_string())
};
let json_rpc_response = serde_json::json!({
"jsonrpc": "2.0",
"id": id_value,
"result": result_value
});
match serde_json::to_string(&json_rpc_response) {
Ok(json_data) => {
let message = SSEMessage::new("message", &json_data, Some(request_id));
tracing::debug!(
request_id = %request_id,
message_data = %json_data,
"Sending tool response via SSE"
);
self.send_sse_message(message, "message"); }
Err(e) => {
tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC response");
}
}
}
pub fn send_tool_error(&self, request_id: &str, server_id: &str, tool_name: &str, error: &str) {
let id_value = if let Ok(num_id) = request_id.parse::<i64>() {
serde_json::Value::Number(serde_json::Number::from(num_id))
} else {
serde_json::Value::String(request_id.to_string())
};
let error_data = serde_json::json!({
"serverId": server_id,
"toolName": tool_name
});
let rpc_error = JsonRpcError {
code: error_codes::SERVER_ERROR,
message: error.to_string(),
data: Some(error_data),
};
let response = JsonRpcResponse {
jsonrpc: JSON_RPC_VERSION.to_string(),
id: id_value,
result: None,
error: Some(rpc_error),
};
match serde_json::to_string(&response) {
Ok(json_data) => {
let message = SSEMessage::new("message", &json_data, Some(request_id));
tracing::debug!(
request_id = %request_id,
message_data = %json_data,
"Sending tool error via SSE"
);
self.send_sse_message(message, "message"); }
Err(e) => {
tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC error response");
}
}
}
pub fn send_server_status(&self, server_name: &str, server_id: &str, status: &str) {
let event = SSEEvent::ServerStatus {
server_name: server_name.to_string(),
server_id: server_id.to_string(),
status: status.to_string(),
};
match serde_json::to_string(&event) {
Ok(json_data) => {
let message = SSEMessage::new("server-status", &json_data, None);
self.send_sse_message(message, "server-status"); }
Err(e) => {
tracing::error!(
error = %e,
event_type = "server-status",
"Failed to serialize SSE event payload"
);
}
}
}
pub fn send_notification(&self, title: &str, message: &str, level: &str) {
let event = SSEEvent::Notification {
title: title.to_string(),
message: message.to_string(),
level: level.to_string(),
};
match serde_json::to_string(&event) {
Ok(json_data) => {
let message = SSEMessage::new("notification", &json_data, None);
self.send_sse_message(message, "notification"); }
Err(e) => {
tracing::error!(
error = %e,
event_type = "notification",
"Failed to serialize SSE event payload"
);
}
}
}
fn send_sse_message(&self, message: SSEMessage, event_type_name: &str) {
match self.sender.send(message) {
Ok(receiver_count) => {
if receiver_count > 0 {
tracing::debug!(
event_type = event_type_name,
receivers = receiver_count,
"SSE event sent to clients"
);
} else {
tracing::trace!(
event_type = event_type_name,
"SSE event created but no clients connected"
);
}
}
Err(e) => {
tracing::error!(
error = %e,
event_type = event_type_name,
"Failed to broadcast SSE event"
);
}
}
}
pub fn format_sse_message(message: &SSEMessage) -> Bytes {
let mut result = String::new();
if let Some(id) = &message.id {
result.push_str(&format!("id: {}\n", id));
}
result.push_str(&format!("event: {}\n", message.event));
result.push_str(&format!("data: {}\n\n", message.data));
Bytes::from(result)
}
}