mcp_runner/sse_proxy/
events.rs1use crate::sse_proxy::types::{SSEEvent, SSEMessage};
7use crate::transport::json_rpc::{JSON_RPC_VERSION, JsonRpcError, JsonRpcResponse}; use actix_web::web::Bytes;
9use tokio::sync::broadcast;
10use tracing;
11
12#[derive(Clone)]
18pub struct EventManager {
19 sender: broadcast::Sender<SSEMessage>,
21}
22
23impl EventManager {
24 pub fn new(capacity: usize) -> Self {
26 let (sender, _) = broadcast::channel(capacity);
27 Self { sender }
28 }
29
30 pub fn subscribe(&self) -> broadcast::Receiver<SSEMessage> {
32 self.sender.subscribe()
33 }
34
35 pub fn send_tool_response(
37 &self,
38 request_id: &str,
39 _server_id: &str, _tool_name: &str, data: serde_json::Value,
42 ) {
43 let response = JsonRpcResponse::success(request_id, data);
45
46 match serde_json::to_string(&response) {
48 Ok(json_data) => {
49 let message = SSEMessage::new("message", &json_data, Some(request_id));
51 self.send_sse_message(message, "message"); }
53 Err(e) => {
54 tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC success response");
55 }
56 }
57 }
58
59 pub fn send_tool_error(&self, request_id: &str, server_id: &str, tool_name: &str, error: &str) {
61 let error_data = serde_json::json!({
65 "serverId": server_id,
66 "toolName": tool_name
67 });
68 let rpc_error = JsonRpcError {
69 code: -32000, message: error.to_string(),
71 data: Some(error_data),
72 };
73
74 let response = JsonRpcResponse {
76 jsonrpc: JSON_RPC_VERSION.to_string(),
77 id: serde_json::Value::String(request_id.to_string()),
78 result: None,
79 error: Some(rpc_error),
80 };
81
82 match serde_json::to_string(&response) {
84 Ok(json_data) => {
85 let message = SSEMessage::new("message", &json_data, Some(request_id));
87 self.send_sse_message(message, "message"); }
89 Err(e) => {
90 tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC error response");
91 }
92 }
93 }
94
95 pub fn send_server_status(&self, server_name: &str, server_id: &str, status: &str) {
97 let event = SSEEvent::ServerStatus {
98 server_name: server_name.to_string(),
99 server_id: server_id.to_string(),
100 status: status.to_string(),
101 };
102 match serde_json::to_string(&event) {
104 Ok(json_data) => {
105 let message = SSEMessage::new("server-status", &json_data, None);
107 self.send_sse_message(message, "server-status"); }
109 Err(e) => {
110 tracing::error!(
111 error = %e,
112 event_type = "server-status",
113 "Failed to serialize SSE event payload"
114 );
115 }
116 }
117 }
118
119 pub fn send_notification(&self, title: &str, message: &str, level: &str) {
121 let event = SSEEvent::Notification {
122 title: title.to_string(),
123 message: message.to_string(),
124 level: level.to_string(),
125 };
126 match serde_json::to_string(&event) {
128 Ok(json_data) => {
129 let message = SSEMessage::new("notification", &json_data, None);
131 self.send_sse_message(message, "notification"); }
133 Err(e) => {
134 tracing::error!(
135 error = %e,
136 event_type = "notification",
137 "Failed to serialize SSE event payload"
138 );
139 }
140 }
141 }
142
143 fn send_sse_message(&self, message: SSEMessage, event_type_name: &str) {
146 match self.sender.send(message) {
147 Ok(receiver_count) => {
148 if receiver_count > 0 {
149 tracing::debug!(
150 event_type = event_type_name,
151 receivers = receiver_count,
152 "SSE event sent to clients"
153 );
154 } else {
155 tracing::trace!(
156 event_type = event_type_name,
157 "SSE event created but no clients connected"
158 );
159 }
160 }
161 Err(e) => {
162 tracing::error!(
163 error = %e,
164 event_type = event_type_name,
165 "Failed to broadcast SSE event"
166 );
167 }
168 }
169 }
170
171 pub fn format_sse_message(message: &SSEMessage) -> Bytes {
173 let mut result = String::new();
174
175 if let Some(id) = &message.id {
176 result.push_str(&format!("id: {}\n", id));
177 }
178
179 result.push_str(&format!("event: {}\n", message.event));
180 result.push_str(&format!("data: {}\n\n", message.data));
181
182 Bytes::from(result)
183 }
184}