turul_http_mcp_server/
notification_bridge.rs

1//! Notification Bridge - Connects NotificationBroadcaster to StreamManager
2//!
3//! This module provides the critical bridge between the notification system
4//! (where tools send events) and the SSE streaming system (where clients receive events).
5//!
6//! CRITICAL: All notifications MUST use proper MCP JSON-RPC format per specification:
7//! {"jsonrpc":"2.0","method":"notifications/{type}","params":{...}}
8//!
9//! Without this bridge: Tools send notifications → NotificationBroadcaster → VOID
10//! With this bridge: Tools send notifications → NotificationBroadcaster → StreamManager → SSE clients ✅
11
12use std::sync::Arc;
13use async_trait::async_trait;
14use tracing::{info, error};
15
16use turul_mcp_json_rpc_server::JsonRpcNotification;
17use turul_mcp_protocol::notifications::{
18    ProgressNotification, LoggingMessageNotification, ResourceUpdatedNotification,
19    ResourceListChangedNotification, ToolListChangedNotification,
20    PromptListChangedNotification, CancelledNotification
21};
22use crate::StreamManager;
23
24/// MCP-compliant notification broadcaster trait for sending ALL notification types over SSE
25///
26/// ALL methods send proper JSON-RPC notifications per MCP 2025-06-18 specification
27#[async_trait]
28pub trait NotificationBroadcaster: Send + Sync {
29    // ================== SERVER-TO-CLIENT NOTIFICATIONS ==================
30
31    /// Send a progress notification (notifications/progress)
32    /// Used for long-running operations to show progress updates
33    async fn send_progress_notification(
34        &self,
35        session_id: &str,
36        notification: ProgressNotification,
37    ) -> Result<(), BroadcastError>;
38
39    /// Send a logging message notification (notifications/message)
40    /// Used to send log messages with different levels (debug, info, warning, error)
41    async fn send_message_notification(
42        &self,
43        session_id: &str,
44        notification: LoggingMessageNotification,
45    ) -> Result<(), BroadcastError>;
46
47    /// Send resource updated notification (notifications/resources/updated)
48    /// Notifies that a specific resource has been updated
49    async fn send_resource_updated_notification(
50        &self,
51        session_id: &str,
52        notification: ResourceUpdatedNotification,
53    ) -> Result<(), BroadcastError>;
54
55    /// Send resource list changed notification (notifications/resources/list_changed)
56    /// Notifies that the resource list has changed (added/removed resources)
57    async fn send_resource_list_changed_notification(
58        &self,
59        session_id: &str,
60        notification: ResourceListChangedNotification,
61    ) -> Result<(), BroadcastError>;
62
63    /// Send tool list changed notification (notifications/tools/list_changed)
64    /// Notifies that the tool list has changed (added/removed tools)
65    async fn send_tool_list_changed_notification(
66        &self,
67        session_id: &str,
68        notification: ToolListChangedNotification,
69    ) -> Result<(), BroadcastError>;
70
71    /// Send prompt list changed notification (notifications/prompts/list_changed)
72    /// Notifies that the prompt list has changed (added/removed prompts)
73    async fn send_prompt_list_changed_notification(
74        &self,
75        session_id: &str,
76        notification: PromptListChangedNotification,
77    ) -> Result<(), BroadcastError>;
78
79    // ================== BIDIRECTIONAL NOTIFICATIONS ==================
80
81    /// Send cancelled notification (notifications/cancelled)
82    /// Can be sent by either client or server to cancel a request
83    async fn send_cancelled_notification(
84        &self,
85        session_id: &str,
86        notification: CancelledNotification,
87    ) -> Result<(), BroadcastError>;
88
89    // ================== BROADCAST METHODS ==================
90
91    /// Broadcast any JSON-RPC notification to all active sessions (server-wide notifications)
92    async fn broadcast_to_all_sessions(&self, notification: JsonRpcNotification) -> Result<Vec<String>, BroadcastError>;
93
94    /// Send any generic JSON-RPC notification to a specific session
95    async fn send_notification(
96        &self,
97        session_id: &str,
98        notification: JsonRpcNotification,
99    ) -> Result<(), BroadcastError>;
100}
101
102/// Errors that can occur during notification broadcasting
103#[derive(Debug, thiserror::Error)]
104pub enum BroadcastError {
105    #[error("Session not found: {0}")]
106    SessionNotFound(String),
107    #[error("Broadcasting failed: {0}")]
108    BroadcastFailed(String),
109    #[error("Serialization error: {0}")]
110    SerializationError(#[from] serde_json::Error),
111}
112
113/// StreamManager-backed notification broadcaster that bridges events to SSE
114///
115/// This implementation converts ALL MCP notification types to proper JSON-RPC format
116/// and forwards them to StreamManager for SSE delivery
117pub struct StreamManagerNotificationBroadcaster {
118    stream_manager: Arc<StreamManager>,
119}
120
121impl StreamManagerNotificationBroadcaster {
122    /// Create new broadcaster that forwards events to StreamManager
123    pub fn new(stream_manager: Arc<StreamManager>) -> Self {
124        Self { stream_manager }
125    }
126}
127
128// ================== CONVERSION HELPERS ==================
129// Helper functions to convert MCP notification types to JsonRpcNotification format
130
131/// Convert MCP notifications to proper JSON-RPC notifications
132pub mod conversion {
133    use super::*;
134    use std::collections::HashMap;
135
136    pub fn progress_to_json_rpc(notification: ProgressNotification) -> JsonRpcNotification {
137        let mut params = HashMap::new();
138        params.insert("progressToken".to_string(), serde_json::json!(notification.params.progress_token));
139        params.insert("progress".to_string(), serde_json::json!(notification.params.progress));
140        if let Some(total) = notification.params.total {
141            params.insert("total".to_string(), serde_json::json!(total));
142        }
143        if let Some(message) = notification.params.message {
144            params.insert("message".to_string(), serde_json::json!(message));
145        }
146        if let Some(meta) = notification.params.meta {
147            params.insert("_meta".to_string(), serde_json::json!(meta));
148        }
149
150        JsonRpcNotification::new_with_object_params(notification.method, params)
151    }
152
153    pub fn message_to_json_rpc(notification: LoggingMessageNotification) -> JsonRpcNotification {
154        let mut params = HashMap::new();
155        params.insert("level".to_string(), serde_json::json!(notification.params.level));
156        params.insert("data".to_string(), notification.params.data);
157        if let Some(logger) = notification.params.logger {
158            params.insert("logger".to_string(), serde_json::json!(logger));
159        }
160        if let Some(meta) = notification.params.meta {
161            params.insert("_meta".to_string(), serde_json::json!(meta));
162        }
163
164        JsonRpcNotification::new_with_object_params(notification.method, params)
165    }
166
167    pub fn resource_updated_to_json_rpc(notification: ResourceUpdatedNotification) -> JsonRpcNotification {
168        let mut params = HashMap::new();
169        params.insert("uri".to_string(), serde_json::json!(notification.params.uri));
170        if let Some(meta) = notification.params.meta {
171            params.insert("_meta".to_string(), serde_json::json!(meta));
172        }
173
174        JsonRpcNotification::new_with_object_params(notification.method, params)
175    }
176
177    pub fn resource_list_changed_to_json_rpc(notification: ResourceListChangedNotification) -> JsonRpcNotification {
178        if let Some(params) = notification.params {
179            if let Some(meta) = params.meta {
180                let mut param_map = HashMap::new();
181                param_map.insert("_meta".to_string(), serde_json::json!(meta));
182                JsonRpcNotification::new_with_object_params(notification.method, param_map)
183            } else {
184                JsonRpcNotification::new_no_params(notification.method)
185            }
186        } else {
187            JsonRpcNotification::new_no_params(notification.method)
188        }
189    }
190
191    pub fn tool_list_changed_to_json_rpc(notification: ToolListChangedNotification) -> JsonRpcNotification {
192        if let Some(params) = notification.params {
193            if let Some(meta) = params.meta {
194                let mut param_map = HashMap::new();
195                param_map.insert("_meta".to_string(), serde_json::json!(meta));
196                JsonRpcNotification::new_with_object_params(notification.method, param_map)
197            } else {
198                JsonRpcNotification::new_no_params(notification.method)
199            }
200        } else {
201            JsonRpcNotification::new_no_params(notification.method)
202        }
203    }
204
205    pub fn prompt_list_changed_to_json_rpc(notification: PromptListChangedNotification) -> JsonRpcNotification {
206        if let Some(params) = notification.params {
207            if let Some(meta) = params.meta {
208                let mut param_map = HashMap::new();
209                param_map.insert("_meta".to_string(), serde_json::json!(meta));
210                JsonRpcNotification::new_with_object_params(notification.method, param_map)
211            } else {
212                JsonRpcNotification::new_no_params(notification.method)
213            }
214        } else {
215            JsonRpcNotification::new_no_params(notification.method)
216        }
217    }
218
219    pub fn cancelled_to_json_rpc(notification: CancelledNotification) -> JsonRpcNotification {
220        let mut params = HashMap::new();
221        params.insert("requestId".to_string(), serde_json::json!(notification.params.request_id));
222        if let Some(reason) = notification.params.reason {
223            params.insert("reason".to_string(), serde_json::json!(reason));
224        }
225        if let Some(meta) = notification.params.meta {
226            params.insert("_meta".to_string(), serde_json::json!(meta));
227        }
228
229        JsonRpcNotification::new_with_object_params(notification.method, params)
230    }
231}
232
233#[async_trait]
234impl NotificationBroadcaster for StreamManagerNotificationBroadcaster {
235    // ================== SERVER-TO-CLIENT NOTIFICATIONS ==================
236
237    async fn send_progress_notification(
238        &self,
239        session_id: &str,
240        notification: ProgressNotification,
241    ) -> Result<(), BroadcastError> {
242        let json_rpc_notification = conversion::progress_to_json_rpc(notification);
243        self.send_notification(session_id, json_rpc_notification).await
244    }
245
246    async fn send_message_notification(
247        &self,
248        session_id: &str,
249        notification: LoggingMessageNotification,
250    ) -> Result<(), BroadcastError> {
251        let json_rpc_notification = conversion::message_to_json_rpc(notification);
252        self.send_notification(session_id, json_rpc_notification).await
253    }
254
255    async fn send_resource_updated_notification(
256        &self,
257        session_id: &str,
258        notification: ResourceUpdatedNotification,
259    ) -> Result<(), BroadcastError> {
260        let json_rpc_notification = conversion::resource_updated_to_json_rpc(notification);
261        self.send_notification(session_id, json_rpc_notification).await
262    }
263
264    async fn send_resource_list_changed_notification(
265        &self,
266        session_id: &str,
267        notification: ResourceListChangedNotification,
268    ) -> Result<(), BroadcastError> {
269        let json_rpc_notification = conversion::resource_list_changed_to_json_rpc(notification);
270        self.send_notification(session_id, json_rpc_notification).await
271    }
272
273    async fn send_tool_list_changed_notification(
274        &self,
275        session_id: &str,
276        notification: ToolListChangedNotification,
277    ) -> Result<(), BroadcastError> {
278        let json_rpc_notification = conversion::tool_list_changed_to_json_rpc(notification);
279        self.send_notification(session_id, json_rpc_notification).await
280    }
281
282    async fn send_prompt_list_changed_notification(
283        &self,
284        session_id: &str,
285        notification: PromptListChangedNotification,
286    ) -> Result<(), BroadcastError> {
287        let json_rpc_notification = conversion::prompt_list_changed_to_json_rpc(notification);
288        self.send_notification(session_id, json_rpc_notification).await
289    }
290
291    // ================== BIDIRECTIONAL NOTIFICATIONS ==================
292
293    async fn send_cancelled_notification(
294        &self,
295        session_id: &str,
296        notification: CancelledNotification,
297    ) -> Result<(), BroadcastError> {
298        let json_rpc_notification = conversion::cancelled_to_json_rpc(notification);
299        self.send_notification(session_id, json_rpc_notification).await
300    }
301
302    // ================== BROADCAST METHODS ==================
303
304    async fn broadcast_to_all_sessions(&self, notification: JsonRpcNotification) -> Result<Vec<String>, BroadcastError> {
305        // Convert JsonRpcNotification to SSE-formatted JSON
306        let sse_data = serde_json::to_value(&notification)
307            .map_err(|e| BroadcastError::SerializationError(e))?;
308
309        // Use StreamManager's built-in broadcast_to_all_sessions method
310        match self.stream_manager.broadcast_to_all_sessions(
311            notification.method.clone(), // Use MCP method name as event type
312            sse_data
313        ).await {
314            Ok(failed_sessions) => {
315                info!("📡 Broadcast JSON-RPC notification to all sessions: method={}, failed={}",
316                      notification.method, failed_sessions.len());
317                Ok(failed_sessions)
318            }
319            Err(e) => {
320                error!("❌ Failed to broadcast JSON-RPC notification: method={}, error={}",
321                       notification.method, e);
322                Err(BroadcastError::BroadcastFailed(e.to_string()))
323            }
324        }
325    }
326
327    async fn send_notification(
328        &self,
329        session_id: &str,
330        notification: JsonRpcNotification,
331    ) -> Result<(), BroadcastError> {
332        // Convert JsonRpcNotification to SSE-formatted JSON
333        let sse_data = serde_json::to_value(&notification)
334            .map_err(|e| BroadcastError::SerializationError(e))?;
335
336        // Send via StreamManager with proper JSON-RPC format
337        match self.stream_manager.broadcast_to_session(
338            session_id,
339            notification.method.clone(), // Use actual MCP method name as event type
340            sse_data
341        ).await {
342            Ok(event_id) => {
343                info!("✅ Sent JSON-RPC notification: session={}, method={}, event_id={}",
344                      session_id, notification.method, event_id);
345                Ok(())
346            }
347            Err(e) => {
348                error!("❌ Failed to send JSON-RPC notification: session={}, method={}, error={}",
349                       session_id, notification.method, e);
350                Err(BroadcastError::BroadcastFailed(e.to_string()))
351            }
352        }
353    }
354}
355
356/// Shared NotificationBroadcaster type alias for use across the turul-http-mcp-server crate
357pub type SharedNotificationBroadcaster = Arc<dyn NotificationBroadcaster + Send + Sync>;