mcp_runner/sse_proxy/
events.rs

1//! SSE event management for the Actix Web-based proxy.
2//!
3//! This module handles the broadcasting of Server-Sent Events (SSE) to connected clients
4//! using Actix Web's streaming capabilities.
5
6use crate::sse_proxy::types::{SSEEvent, SSEMessage};
7use crate::transport::json_rpc::{JSON_RPC_VERSION, JsonRpcError, JsonRpcResponse, error_codes}; // Import JSON-RPC types
8use actix_web::web::Bytes;
9use tokio::sync::broadcast;
10use tracing;
11
12/// Handles SSE event processing and broadcasting to connected clients
13///
14/// This struct manages the broadcasting of events to multiple SSE clients using a
15/// Tokio broadcast channel. It provides methods for sending different types of events
16/// and handling SSE connections.
17#[derive(Clone)]
18pub struct EventManager {
19    /// Broadcast channel for sending events to all connected clients
20    sender: broadcast::Sender<SSEMessage>,
21}
22
23impl EventManager {
24    /// Create a new EventManager with the specified channel capacity
25    pub fn new(capacity: usize) -> Self {
26        let (sender, _) = broadcast::channel(capacity);
27        Self { sender }
28    }
29
30    /// Send initial configuration information to a newly connected client
31    pub fn send_initial_config(&self, message_url: &str, _servers: &serde_json::Value) {
32        // The Python client expects just the URL path as the data, not JSON
33        // This matches the Python server implementation in SseServerTransport
34
35        // Create SSE message with only the path as data
36        let message = SSEMessage::new("endpoint", message_url, None);
37
38        // Log the full SSE message for debugging
39        let formatted_message = Self::format_sse_message(&message);
40        let formatted_str = std::str::from_utf8(&formatted_message)
41            .unwrap_or("Could not convert SSE message to string");
42
43        tracing::debug!(
44            raw_message = %formatted_str,
45            event_type = "endpoint",
46            "Raw SSE message being sent to client"
47        );
48
49        // Log the message data
50        tracing::debug!(
51            event = "endpoint",
52            data = %message_url,
53            "Sending message URL path as data"
54        );
55
56        // Send the message
57        self.send_sse_message(message, "endpoint");
58    }
59
60    /// Subscribe to events
61    pub fn subscribe(&self) -> broadcast::Receiver<SSEMessage> {
62        self.sender.subscribe()
63    }
64
65    /// Send a tool response event using JSON-RPC format
66    pub fn send_tool_response(
67        &self,
68        request_id: &str,
69        _server_id: &str, // No longer needed for the event payload itself
70        _tool_name: &str, // No longer needed for the event payload itself
71        data: serde_json::Value,
72    ) {
73        // Extract the result from the JSON-RPC response if it exists
74        let result_value = if let Some(result) = data.get("result") {
75            // We've found the result field in the JSON-RPC response
76            // Now we need to return just the result instead of the full JSON-RPC response
77            result.clone()
78        } else {
79            // If there's no result field, just pass the data as-is
80            data
81        };
82
83        // Parse request_id to the appropriate type (number or string)
84        let id_value = if let Ok(num_id) = request_id.parse::<i64>() {
85            serde_json::Value::Number(serde_json::Number::from(num_id))
86        } else {
87            serde_json::Value::String(request_id.to_string())
88        };
89
90        // Create the JSON-RPC response directly with the parsed ID
91        let json_rpc_response = serde_json::json!({
92            "jsonrpc": "2.0",
93            "id": id_value,
94            "result": result_value
95        });
96
97        // Serialize the JSON-RPC response
98        match serde_json::to_string(&json_rpc_response) {
99            Ok(json_data) => {
100                // Create SSE message with event type "message" and request_id as SSE id
101                let message = SSEMessage::new("message", &json_data, Some(request_id));
102
103                // Log the message for debugging
104                tracing::debug!(
105                    request_id = %request_id,
106                    message_data = %json_data,
107                    "Sending tool response via SSE"
108                );
109
110                self.send_sse_message(message, "message"); // Use helper to send
111            }
112            Err(e) => {
113                tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC response");
114            }
115        }
116    }
117
118    /// Send a tool error event using JSON-RPC format
119    pub fn send_tool_error(&self, request_id: &str, server_id: &str, tool_name: &str, error: &str) {
120        // Parse request_id to the appropriate type (number or string)
121        let id_value = if let Ok(num_id) = request_id.parse::<i64>() {
122            serde_json::Value::Number(serde_json::Number::from(num_id))
123        } else {
124            serde_json::Value::String(request_id.to_string())
125        };
126
127        // Construct a JSON-RPC error object
128        // Using the standard server error code from MCP spec
129        // Include more details in the 'data' field
130        let error_data = serde_json::json!({
131            "serverId": server_id,
132            "toolName": tool_name
133        });
134        let rpc_error = JsonRpcError {
135            code: error_codes::SERVER_ERROR,
136            message: error.to_string(),
137            data: Some(error_data),
138        };
139
140        // Construct a JSON-RPC error response
141        let response = JsonRpcResponse {
142            jsonrpc: JSON_RPC_VERSION.to_string(),
143            id: id_value,
144            result: None,
145            error: Some(rpc_error),
146        };
147
148        // Serialize the JSON-RPC response
149        match serde_json::to_string(&response) {
150            Ok(json_data) => {
151                // Create SSE message with event type "message" and request_id as SSE id
152                let message = SSEMessage::new("message", &json_data, Some(request_id));
153
154                // Log the message for debugging
155                tracing::debug!(
156                    request_id = %request_id,
157                    message_data = %json_data,
158                    "Sending tool error via SSE"
159                );
160
161                self.send_sse_message(message, "message"); // Use helper to send
162            }
163            Err(e) => {
164                tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC error response");
165            }
166        }
167    }
168
169    /// Send a server status update event
170    pub fn send_server_status(&self, server_name: &str, server_id: &str, status: &str) {
171        let event = SSEEvent::ServerStatus {
172            server_name: server_name.to_string(),
173            server_id: server_id.to_string(),
174            status: status.to_string(),
175        };
176        // Serialize event payload to JSON
177        match serde_json::to_string(&event) {
178            Ok(json_data) => {
179                // Create SSE message with specific event type
180                let message = SSEMessage::new("server-status", &json_data, None);
181                self.send_sse_message(message, "server-status"); // Use helper to send
182            }
183            Err(e) => {
184                tracing::error!(
185                    error = %e,
186                    event_type = "server-status",
187                    "Failed to serialize SSE event payload"
188                );
189            }
190        }
191    }
192
193    /// Send a notification event
194    pub fn send_notification(&self, title: &str, message: &str, level: &str) {
195        let event = SSEEvent::Notification {
196            title: title.to_string(),
197            message: message.to_string(),
198            level: level.to_string(),
199        };
200        // Serialize event payload to JSON
201        match serde_json::to_string(&event) {
202            Ok(json_data) => {
203                // Create SSE message with specific event type
204                let message = SSEMessage::new("notification", &json_data, None);
205                self.send_sse_message(message, "notification"); // Use helper to send
206            }
207            Err(e) => {
208                tracing::error!(
209                    error = %e,
210                    event_type = "notification",
211                    "Failed to serialize SSE event payload"
212                );
213            }
214        }
215    }
216
217    /// Helper to send an SSE message via the broadcast channel
218    /// (Replaces the old create_and_send_event logic for broadcasting)
219    fn send_sse_message(&self, message: SSEMessage, event_type_name: &str) {
220        match self.sender.send(message) {
221            Ok(receiver_count) => {
222                if receiver_count > 0 {
223                    tracing::debug!(
224                        event_type = event_type_name,
225                        receivers = receiver_count,
226                        "SSE event sent to clients"
227                    );
228                } else {
229                    tracing::trace!(
230                        event_type = event_type_name,
231                        "SSE event created but no clients connected"
232                    );
233                }
234            }
235            Err(e) => {
236                tracing::error!(
237                    error = %e,
238                    event_type = event_type_name,
239                    "Failed to broadcast SSE event"
240                );
241            }
242        }
243    }
244
245    /// Format an SSEMessage for the wire
246    pub fn format_sse_message(message: &SSEMessage) -> Bytes {
247        let mut result = String::new();
248
249        if let Some(id) = &message.id {
250            result.push_str(&format!("id: {}\n", id));
251        }
252
253        result.push_str(&format!("event: {}\n", message.event));
254        result.push_str(&format!("data: {}\n\n", message.data));
255
256        Bytes::from(result)
257    }
258}