turul_http_mcp_server/
notification_bridge.rs

1//! Notification Bridge - Connects NotificationBroadcaster to StreamManager
2//!
3//! This module provides the critical bridge between the notification system
4//! (where tools send events) and the SSE streaming system (where clients receive events).
5//!
6//! CRITICAL: All notifications MUST use proper MCP JSON-RPC format per specification:
7//! {"jsonrpc":"2.0","method":"notifications/{type}","params":{...}}
8//!
9//! Without this bridge: Tools send notifications → NotificationBroadcaster → VOID
10//! With this bridge: Tools send notifications → NotificationBroadcaster → StreamManager → SSE clients ✅
11
12use async_trait::async_trait;
13use std::sync::Arc;
14use tracing::{error, info};
15
16use crate::StreamManager;
17use turul_mcp_json_rpc_server::JsonRpcNotification;
18use turul_mcp_protocol::notifications::{
19    CancelledNotification, LoggingMessageNotification, ProgressNotification,
20    PromptListChangedNotification, ResourceListChangedNotification, ResourceUpdatedNotification,
21    ToolListChangedNotification,
22};
23
24/// MCP-compliant notification broadcaster trait for sending ALL notification types over SSE
25///
26/// ALL methods send proper JSON-RPC notifications per MCP 2025-06-18 specification
27#[async_trait]
28pub trait NotificationBroadcaster: Send + Sync {
29    // ================== SERVER-TO-CLIENT NOTIFICATIONS ==================
30
31    /// Send a progress notification (notifications/progress)
32    /// Used for long-running operations to show progress updates
33    async fn send_progress_notification(
34        &self,
35        session_id: &str,
36        notification: ProgressNotification,
37    ) -> Result<(), BroadcastError>;
38
39    /// Send a logging message notification (notifications/message)
40    /// Used to send log messages with different levels (debug, info, warning, error)
41    async fn send_message_notification(
42        &self,
43        session_id: &str,
44        notification: LoggingMessageNotification,
45    ) -> Result<(), BroadcastError>;
46
47    /// Send resource updated notification (notifications/resources/updated)
48    /// Notifies that a specific resource has been updated
49    async fn send_resource_updated_notification(
50        &self,
51        session_id: &str,
52        notification: ResourceUpdatedNotification,
53    ) -> Result<(), BroadcastError>;
54
55    /// Send resource list changed notification (notifications/resources/listChanged)
56    /// Notifies that the resource list has changed (added/removed resources)
57    async fn send_resource_list_changed_notification(
58        &self,
59        session_id: &str,
60        notification: ResourceListChangedNotification,
61    ) -> Result<(), BroadcastError>;
62
63    /// Send tool list changed notification (notifications/tools/listChanged)
64    /// Notifies that the tool list has changed (added/removed tools)
65    async fn send_tool_list_changed_notification(
66        &self,
67        session_id: &str,
68        notification: ToolListChangedNotification,
69    ) -> Result<(), BroadcastError>;
70
71    /// Send prompt list changed notification (notifications/prompts/listChanged)
72    /// Notifies that the prompt list has changed (added/removed prompts)
73    async fn send_prompt_list_changed_notification(
74        &self,
75        session_id: &str,
76        notification: PromptListChangedNotification,
77    ) -> Result<(), BroadcastError>;
78
79    // ================== BIDIRECTIONAL NOTIFICATIONS ==================
80
81    /// Send cancelled notification (notifications/cancelled)
82    /// Can be sent by either client or server to cancel a request
83    async fn send_cancelled_notification(
84        &self,
85        session_id: &str,
86        notification: CancelledNotification,
87    ) -> Result<(), BroadcastError>;
88
89    // ================== BROADCAST METHODS ==================
90
91    /// Broadcast any JSON-RPC notification to all active sessions (server-wide notifications)
92    async fn broadcast_to_all_sessions(
93        &self,
94        notification: JsonRpcNotification,
95    ) -> Result<Vec<String>, BroadcastError>;
96
97    /// Send any generic JSON-RPC notification to a specific session
98    async fn send_notification(
99        &self,
100        session_id: &str,
101        notification: JsonRpcNotification,
102    ) -> Result<(), BroadcastError>;
103}
104
105/// Errors that can occur during notification broadcasting
106#[derive(Debug, thiserror::Error)]
107pub enum BroadcastError {
108    #[error("Session not found: {0}")]
109    SessionNotFound(String),
110    #[error("Broadcasting failed: {0}")]
111    BroadcastFailed(String),
112    #[error("Serialization error: {0}")]
113    SerializationError(#[from] serde_json::Error),
114}
115
116/// StreamManager-backed notification broadcaster that bridges events to SSE
117///
118/// This implementation converts ALL MCP notification types to proper JSON-RPC format
119/// and forwards them to StreamManager for SSE delivery
120pub struct StreamManagerNotificationBroadcaster {
121    stream_manager: Arc<StreamManager>,
122}
123
124impl StreamManagerNotificationBroadcaster {
125    /// Create new broadcaster that forwards events to StreamManager
126    pub fn new(stream_manager: Arc<StreamManager>) -> Self {
127        Self { stream_manager }
128    }
129}
130
131// ================== CONVERSION HELPERS ==================
132// Helper functions to convert MCP notification types to JsonRpcNotification format
133
134/// Convert MCP notifications to proper JSON-RPC notifications
135pub mod conversion {
136    use super::*;
137    use std::collections::HashMap;
138
139    pub fn progress_to_json_rpc(notification: ProgressNotification) -> JsonRpcNotification {
140        let mut params = HashMap::new();
141        params.insert(
142            "progressToken".to_string(),
143            serde_json::json!(notification.params.progress_token),
144        );
145        params.insert(
146            "progress".to_string(),
147            serde_json::json!(notification.params.progress),
148        );
149        if let Some(total) = notification.params.total {
150            params.insert("total".to_string(), serde_json::json!(total));
151        }
152        if let Some(message) = notification.params.message {
153            params.insert("message".to_string(), serde_json::json!(message));
154        }
155        if let Some(meta) = notification.params.meta {
156            params.insert("_meta".to_string(), serde_json::json!(meta));
157        }
158
159        JsonRpcNotification::new_with_object_params(notification.method, params)
160    }
161
162    pub fn message_to_json_rpc(notification: LoggingMessageNotification) -> JsonRpcNotification {
163        let mut params = HashMap::new();
164        params.insert(
165            "level".to_string(),
166            serde_json::json!(notification.params.level),
167        );
168        params.insert("data".to_string(), notification.params.data);
169        if let Some(logger) = notification.params.logger {
170            params.insert("logger".to_string(), serde_json::json!(logger));
171        }
172        if let Some(meta) = notification.params.meta {
173            params.insert("_meta".to_string(), serde_json::json!(meta));
174        }
175
176        JsonRpcNotification::new_with_object_params(notification.method, params)
177    }
178
179    pub fn resource_updated_to_json_rpc(
180        notification: ResourceUpdatedNotification,
181    ) -> JsonRpcNotification {
182        let mut params = HashMap::new();
183        params.insert(
184            "uri".to_string(),
185            serde_json::json!(notification.params.uri),
186        );
187        if let Some(meta) = notification.params.meta {
188            params.insert("_meta".to_string(), serde_json::json!(meta));
189        }
190
191        JsonRpcNotification::new_with_object_params(notification.method, params)
192    }
193
194    pub fn resource_list_changed_to_json_rpc(
195        notification: ResourceListChangedNotification,
196    ) -> JsonRpcNotification {
197        if let Some(params) = notification.params {
198            if let Some(meta) = params.meta {
199                let mut param_map = HashMap::new();
200                param_map.insert("_meta".to_string(), serde_json::json!(meta));
201                JsonRpcNotification::new_with_object_params(notification.method, param_map)
202            } else {
203                JsonRpcNotification::new_no_params(notification.method)
204            }
205        } else {
206            JsonRpcNotification::new_no_params(notification.method)
207        }
208    }
209
210    pub fn tool_list_changed_to_json_rpc(
211        notification: ToolListChangedNotification,
212    ) -> JsonRpcNotification {
213        if let Some(params) = notification.params {
214            if let Some(meta) = params.meta {
215                let mut param_map = HashMap::new();
216                param_map.insert("_meta".to_string(), serde_json::json!(meta));
217                JsonRpcNotification::new_with_object_params(notification.method, param_map)
218            } else {
219                JsonRpcNotification::new_no_params(notification.method)
220            }
221        } else {
222            JsonRpcNotification::new_no_params(notification.method)
223        }
224    }
225
226    pub fn prompt_list_changed_to_json_rpc(
227        notification: PromptListChangedNotification,
228    ) -> JsonRpcNotification {
229        if let Some(params) = notification.params {
230            if let Some(meta) = params.meta {
231                let mut param_map = HashMap::new();
232                param_map.insert("_meta".to_string(), serde_json::json!(meta));
233                JsonRpcNotification::new_with_object_params(notification.method, param_map)
234            } else {
235                JsonRpcNotification::new_no_params(notification.method)
236            }
237        } else {
238            JsonRpcNotification::new_no_params(notification.method)
239        }
240    }
241
242    pub fn cancelled_to_json_rpc(notification: CancelledNotification) -> JsonRpcNotification {
243        let mut params = HashMap::new();
244        params.insert(
245            "requestId".to_string(),
246            serde_json::json!(notification.params.request_id),
247        );
248        if let Some(reason) = notification.params.reason {
249            params.insert("reason".to_string(), serde_json::json!(reason));
250        }
251        if let Some(meta) = notification.params.meta {
252            params.insert("_meta".to_string(), serde_json::json!(meta));
253        }
254
255        JsonRpcNotification::new_with_object_params(notification.method, params)
256    }
257}
258
259#[async_trait]
260impl NotificationBroadcaster for StreamManagerNotificationBroadcaster {
261    // ================== SERVER-TO-CLIENT NOTIFICATIONS ==================
262
263    async fn send_progress_notification(
264        &self,
265        session_id: &str,
266        notification: ProgressNotification,
267    ) -> Result<(), BroadcastError> {
268        let json_rpc_notification = conversion::progress_to_json_rpc(notification);
269        self.send_notification(session_id, json_rpc_notification)
270            .await
271    }
272
273    async fn send_message_notification(
274        &self,
275        session_id: &str,
276        notification: LoggingMessageNotification,
277    ) -> Result<(), BroadcastError> {
278        let json_rpc_notification = conversion::message_to_json_rpc(notification);
279        self.send_notification(session_id, json_rpc_notification)
280            .await
281    }
282
283    async fn send_resource_updated_notification(
284        &self,
285        session_id: &str,
286        notification: ResourceUpdatedNotification,
287    ) -> Result<(), BroadcastError> {
288        let json_rpc_notification = conversion::resource_updated_to_json_rpc(notification);
289        self.send_notification(session_id, json_rpc_notification)
290            .await
291    }
292
293    async fn send_resource_list_changed_notification(
294        &self,
295        session_id: &str,
296        notification: ResourceListChangedNotification,
297    ) -> Result<(), BroadcastError> {
298        let json_rpc_notification = conversion::resource_list_changed_to_json_rpc(notification);
299        self.send_notification(session_id, json_rpc_notification)
300            .await
301    }
302
303    async fn send_tool_list_changed_notification(
304        &self,
305        session_id: &str,
306        notification: ToolListChangedNotification,
307    ) -> Result<(), BroadcastError> {
308        let json_rpc_notification = conversion::tool_list_changed_to_json_rpc(notification);
309        self.send_notification(session_id, json_rpc_notification)
310            .await
311    }
312
313    async fn send_prompt_list_changed_notification(
314        &self,
315        session_id: &str,
316        notification: PromptListChangedNotification,
317    ) -> Result<(), BroadcastError> {
318        let json_rpc_notification = conversion::prompt_list_changed_to_json_rpc(notification);
319        self.send_notification(session_id, json_rpc_notification)
320            .await
321    }
322
323    // ================== BIDIRECTIONAL NOTIFICATIONS ==================
324
325    async fn send_cancelled_notification(
326        &self,
327        session_id: &str,
328        notification: CancelledNotification,
329    ) -> Result<(), BroadcastError> {
330        let json_rpc_notification = conversion::cancelled_to_json_rpc(notification);
331        self.send_notification(session_id, json_rpc_notification)
332            .await
333    }
334
335    // ================== BROADCAST METHODS ==================
336
337    async fn broadcast_to_all_sessions(
338        &self,
339        notification: JsonRpcNotification,
340    ) -> Result<Vec<String>, BroadcastError> {
341        // Convert JsonRpcNotification to SSE-formatted JSON
342        let sse_data =
343            serde_json::to_value(&notification).map_err(BroadcastError::SerializationError)?;
344
345        // Use StreamManager's built-in broadcast_to_all_sessions method
346        match self
347            .stream_manager
348            .broadcast_to_all_sessions(
349                notification.method.clone(), // Use MCP method name as event type
350                sse_data,
351            )
352            .await
353        {
354            Ok(failed_sessions) => {
355                info!(
356                    "📡 Broadcast JSON-RPC notification to all sessions: method={}, failed={}",
357                    notification.method,
358                    failed_sessions.len()
359                );
360                Ok(failed_sessions)
361            }
362            Err(e) => {
363                error!(
364                    "❌ Failed to broadcast JSON-RPC notification: method={}, error={}",
365                    notification.method, e
366                );
367                Err(BroadcastError::BroadcastFailed(e.to_string()))
368            }
369        }
370    }
371
372    async fn send_notification(
373        &self,
374        session_id: &str,
375        notification: JsonRpcNotification,
376    ) -> Result<(), BroadcastError> {
377        // Convert JsonRpcNotification to SSE-formatted JSON
378        let sse_data =
379            serde_json::to_value(&notification).map_err(BroadcastError::SerializationError)?;
380
381        // Send via StreamManager with proper JSON-RPC format
382        match self
383            .stream_manager
384            .broadcast_to_session(
385                session_id,
386                notification.method.clone(), // Use actual MCP method name as event type
387                sse_data,
388            )
389            .await
390        {
391            Ok(event_id) => {
392                info!(
393                    "✅ Sent JSON-RPC notification: session={}, method={}, event_id={}",
394                    session_id, notification.method, event_id
395                );
396                Ok(())
397            }
398            Err(e) => {
399                error!(
400                    "❌ Failed to send JSON-RPC notification: session={}, method={}, error={}",
401                    session_id, notification.method, e
402                );
403                Err(BroadcastError::BroadcastFailed(e.to_string()))
404            }
405        }
406    }
407}
408
409/// Shared NotificationBroadcaster type alias for use across the turul-http-mcp-server crate
410pub type SharedNotificationBroadcaster = Arc<dyn NotificationBroadcaster + Send + Sync>;