mcp_runner/sse_proxy/
events.rs

1//! SSE event management for the Actix Web-based proxy.
2//!
3//! This module handles the broadcasting of Server-Sent Events (SSE) to connected clients
4//! using Actix Web's streaming capabilities.
5
6use crate::sse_proxy::types::{SSEEvent, SSEMessage};
7use crate::transport::json_rpc::{JSON_RPC_VERSION, JsonRpcError, JsonRpcResponse}; // Import JSON-RPC types
8use actix_web::web::Bytes;
9use tokio::sync::broadcast;
10use tracing;
11
12/// Handles SSE event processing and broadcasting to connected clients
13///
14/// This struct manages the broadcasting of events to multiple SSE clients using a
15/// Tokio broadcast channel. It provides methods for sending different types of events
16/// and handling SSE connections.
17#[derive(Clone)]
18pub struct EventManager {
19    /// Broadcast channel for sending events to all connected clients
20    sender: broadcast::Sender<SSEMessage>,
21}
22
23impl EventManager {
24    /// Create a new EventManager with the specified channel capacity
25    pub fn new(capacity: usize) -> Self {
26        let (sender, _) = broadcast::channel(capacity);
27        Self { sender }
28    }
29
30    /// Subscribe to events
31    pub fn subscribe(&self) -> broadcast::Receiver<SSEMessage> {
32        self.sender.subscribe()
33    }
34
35    /// Send a tool response event using JSON-RPC format
36    pub fn send_tool_response(
37        &self,
38        request_id: &str,
39        _server_id: &str, // No longer needed for the event payload itself
40        _tool_name: &str, // No longer needed for the event payload itself
41        data: serde_json::Value,
42    ) {
43        // Construct a JSON-RPC success response
44        let response = JsonRpcResponse::success(request_id, data);
45
46        // Serialize the JSON-RPC response
47        match serde_json::to_string(&response) {
48            Ok(json_data) => {
49                // Create SSE message with event type "message" and request_id as SSE id
50                let message = SSEMessage::new("message", &json_data, Some(request_id));
51                self.send_sse_message(message, "message"); // Use helper to send
52            }
53            Err(e) => {
54                tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC success response");
55            }
56        }
57    }
58
59    /// Send a tool error event using JSON-RPC format
60    pub fn send_tool_error(&self, request_id: &str, server_id: &str, tool_name: &str, error: &str) {
61        // Construct a JSON-RPC error object
62        // Using a generic error code -32000 for server error
63        // Optionally include more details in the 'data' field
64        let error_data = serde_json::json!({
65            "serverId": server_id,
66            "toolName": tool_name
67        });
68        let rpc_error = JsonRpcError {
69            code: -32000, // Example: Generic server error code
70            message: error.to_string(),
71            data: Some(error_data),
72        };
73
74        // Construct a JSON-RPC error response
75        let response = JsonRpcResponse {
76            jsonrpc: JSON_RPC_VERSION.to_string(),
77            id: serde_json::Value::String(request_id.to_string()),
78            result: None,
79            error: Some(rpc_error),
80        };
81
82        // Serialize the JSON-RPC response
83        match serde_json::to_string(&response) {
84            Ok(json_data) => {
85                // Create SSE message with event type "message" and request_id as SSE id
86                let message = SSEMessage::new("message", &json_data, Some(request_id));
87                self.send_sse_message(message, "message"); // Use helper to send
88            }
89            Err(e) => {
90                tracing::error!(error = %e, request_id = %request_id, "Failed to serialize JSON-RPC error response");
91            }
92        }
93    }
94
95    /// Send a server status update event
96    pub fn send_server_status(&self, server_name: &str, server_id: &str, status: &str) {
97        let event = SSEEvent::ServerStatus {
98            server_name: server_name.to_string(),
99            server_id: server_id.to_string(),
100            status: status.to_string(),
101        };
102        // Serialize event payload to JSON
103        match serde_json::to_string(&event) {
104            Ok(json_data) => {
105                // Create SSE message with specific event type
106                let message = SSEMessage::new("server-status", &json_data, None);
107                self.send_sse_message(message, "server-status"); // Use helper to send
108            }
109            Err(e) => {
110                tracing::error!(
111                    error = %e,
112                    event_type = "server-status",
113                    "Failed to serialize SSE event payload"
114                );
115            }
116        }
117    }
118
119    /// Send a notification event
120    pub fn send_notification(&self, title: &str, message: &str, level: &str) {
121        let event = SSEEvent::Notification {
122            title: title.to_string(),
123            message: message.to_string(),
124            level: level.to_string(),
125        };
126        // Serialize event payload to JSON
127        match serde_json::to_string(&event) {
128            Ok(json_data) => {
129                // Create SSE message with specific event type
130                let message = SSEMessage::new("notification", &json_data, None);
131                self.send_sse_message(message, "notification"); // Use helper to send
132            }
133            Err(e) => {
134                tracing::error!(
135                    error = %e,
136                    event_type = "notification",
137                    "Failed to serialize SSE event payload"
138                );
139            }
140        }
141    }
142
143    /// Helper to send an SSE message via the broadcast channel
144    /// (Replaces the old create_and_send_event logic for broadcasting)
145    fn send_sse_message(&self, message: SSEMessage, event_type_name: &str) {
146        match self.sender.send(message) {
147            Ok(receiver_count) => {
148                if receiver_count > 0 {
149                    tracing::debug!(
150                        event_type = event_type_name,
151                        receivers = receiver_count,
152                        "SSE event sent to clients"
153                    );
154                } else {
155                    tracing::trace!(
156                        event_type = event_type_name,
157                        "SSE event created but no clients connected"
158                    );
159                }
160            }
161            Err(e) => {
162                tracing::error!(
163                    error = %e,
164                    event_type = event_type_name,
165                    "Failed to broadcast SSE event"
166                );
167            }
168        }
169    }
170
171    /// Format an SSEMessage for the wire
172    pub fn format_sse_message(message: &SSEMessage) -> Bytes {
173        let mut result = String::new();
174
175        if let Some(id) = &message.id {
176            result.push_str(&format!("id: {}\n", id));
177        }
178
179        result.push_str(&format!("event: {}\n", message.event));
180        result.push_str(&format!("data: {}\n\n", message.data));
181
182        Bytes::from(result)
183    }
184}