mcp_runner/sse_proxy/
events.rs1use crate::sse_proxy::types::{SSEEvent, SSEMessage};
7use crate::transport::json_rpc::{JSON_RPC_VERSION, JsonRpcError, JsonRpcResponse, error_codes}; use actix_web::web::Bytes;
9use tokio::sync::broadcast;
10use tracing;
11
12#[derive(Clone)]
18pub struct EventManager {
19 sender: broadcast::Sender<SSEMessage>,
21}
22
23impl EventManager {
24 pub fn new(capacity: usize) -> Self {
26 let (sender, _) = broadcast::channel(capacity);
27 Self { sender }
28 }
29
30 pub fn send_initial_config(&self, message_url: &str, _servers: &serde_json::Value) {
32 let message = SSEMessage::new("endpoint", message_url, None);
37
38 let formatted_message = Self::format_sse_message(&message);
40 let formatted_str = std::str::from_utf8(&formatted_message)
41 .unwrap_or("Could not convert SSE message to string");
42
43 tracing::debug!(
44 raw_message = %formatted_str,
45 event_type = "endpoint",
46 "Raw SSE message being sent to client"
47 );
48
49 tracing::debug!(
51 event = "endpoint",
52 data = %message_url,
53 "Sending message URL path as data"
54 );
55
56 self.send_sse_message(message, "endpoint");
58 }
59
60 pub fn subscribe(&self) -> broadcast::Receiver<SSEMessage> {
62 self.sender.subscribe()
63 }
64
65 pub fn send_tool_response(
67 &self,
68 request_id: &str,
69 _server_id: &str, _tool_name: &str, data: serde_json::Value,
72 ) {
73 let result_value = if let Some(result) = data.get("result") {
75 result.clone()
78 } else {
79 data
81 };
82
83 let id_value = if let Ok(num_id) = request_id.parse::<i64>() {
85 serde_json::Value::Number(serde_json::Number::from(num_id))
86 } else {
87 serde_json::Value::String(request_id.to_string())
88 };
89
90 let json_rpc_response = serde_json::json!({
92 "jsonrpc": "2.0",
93 "id": id_value,
94 "result": result_value
95 });
96
97 match serde_json::to_string(&json_rpc_response) {
99 Ok(json_data) => {
100 let message = SSEMessage::new("message", &json_data, Some(request_id));
102
103 tracing::debug!(
105 request_id = %request_id,
106 message_data = %json_data,
107 "Sending tool response via SSE"
108 );
109
110 self.send_sse_message(message, "message"); }
112 Err(e) => {
113 tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC response");
114 }
115 }
116 }
117
118 pub fn send_tool_error(&self, request_id: &str, server_id: &str, tool_name: &str, error: &str) {
120 let id_value = if let Ok(num_id) = request_id.parse::<i64>() {
122 serde_json::Value::Number(serde_json::Number::from(num_id))
123 } else {
124 serde_json::Value::String(request_id.to_string())
125 };
126
127 let error_data = serde_json::json!({
131 "serverId": server_id,
132 "toolName": tool_name
133 });
134 let rpc_error = JsonRpcError {
135 code: error_codes::SERVER_ERROR,
136 message: error.to_string(),
137 data: Some(error_data),
138 };
139
140 let response = JsonRpcResponse {
142 jsonrpc: JSON_RPC_VERSION.to_string(),
143 id: id_value,
144 result: None,
145 error: Some(rpc_error),
146 };
147
148 match serde_json::to_string(&response) {
150 Ok(json_data) => {
151 let message = SSEMessage::new("message", &json_data, Some(request_id));
153
154 tracing::debug!(
156 request_id = %request_id,
157 message_data = %json_data,
158 "Sending tool error via SSE"
159 );
160
161 self.send_sse_message(message, "message"); }
163 Err(e) => {
164 tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC error response");
165 }
166 }
167 }
168
169 pub fn send_server_status(&self, server_name: &str, server_id: &str, status: &str) {
171 let event = SSEEvent::ServerStatus {
172 server_name: server_name.to_string(),
173 server_id: server_id.to_string(),
174 status: status.to_string(),
175 };
176 match serde_json::to_string(&event) {
178 Ok(json_data) => {
179 let message = SSEMessage::new("server-status", &json_data, None);
181 self.send_sse_message(message, "server-status"); }
183 Err(e) => {
184 tracing::error!(
185 error = %e,
186 event_type = "server-status",
187 "Failed to serialize SSE event payload"
188 );
189 }
190 }
191 }
192
193 pub fn send_notification(&self, title: &str, message: &str, level: &str) {
195 let event = SSEEvent::Notification {
196 title: title.to_string(),
197 message: message.to_string(),
198 level: level.to_string(),
199 };
200 match serde_json::to_string(&event) {
202 Ok(json_data) => {
203 let message = SSEMessage::new("notification", &json_data, None);
205 self.send_sse_message(message, "notification"); }
207 Err(e) => {
208 tracing::error!(
209 error = %e,
210 event_type = "notification",
211 "Failed to serialize SSE event payload"
212 );
213 }
214 }
215 }
216
217 fn send_sse_message(&self, message: SSEMessage, event_type_name: &str) {
220 match self.sender.send(message) {
221 Ok(receiver_count) => {
222 if receiver_count > 0 {
223 tracing::debug!(
224 event_type = event_type_name,
225 receivers = receiver_count,
226 "SSE event sent to clients"
227 );
228 } else {
229 tracing::trace!(
230 event_type = event_type_name,
231 "SSE event created but no clients connected"
232 );
233 }
234 }
235 Err(e) => {
236 tracing::error!(
237 error = %e,
238 event_type = event_type_name,
239 "Failed to broadcast SSE event"
240 );
241 }
242 }
243 }
244
245 pub fn format_sse_message(message: &SSEMessage) -> Bytes {
247 let mut result = String::new();
248
249 if let Some(id) = &message.id {
250 result.push_str(&format!("id: {}\n", id));
251 }
252
253 result.push_str(&format!("event: {}\n", message.event));
254 result.push_str(&format!("data: {}\n\n", message.data));
255
256 Bytes::from(result)
257 }
258}