1use std::sync::Arc;
13use async_trait::async_trait;
14use tracing::{info, error};
15
16use turul_mcp_json_rpc_server::JsonRpcNotification;
17use turul_mcp_protocol::notifications::{
18 ProgressNotification, LoggingMessageNotification, ResourceUpdatedNotification,
19 ResourceListChangedNotification, ToolListChangedNotification,
20 PromptListChangedNotification, CancelledNotification
21};
22use crate::StreamManager;
23
24#[async_trait]
28pub trait NotificationBroadcaster: Send + Sync {
29 async fn send_progress_notification(
34 &self,
35 session_id: &str,
36 notification: ProgressNotification,
37 ) -> Result<(), BroadcastError>;
38
39 async fn send_message_notification(
42 &self,
43 session_id: &str,
44 notification: LoggingMessageNotification,
45 ) -> Result<(), BroadcastError>;
46
47 async fn send_resource_updated_notification(
50 &self,
51 session_id: &str,
52 notification: ResourceUpdatedNotification,
53 ) -> Result<(), BroadcastError>;
54
55 async fn send_resource_list_changed_notification(
58 &self,
59 session_id: &str,
60 notification: ResourceListChangedNotification,
61 ) -> Result<(), BroadcastError>;
62
63 async fn send_tool_list_changed_notification(
66 &self,
67 session_id: &str,
68 notification: ToolListChangedNotification,
69 ) -> Result<(), BroadcastError>;
70
71 async fn send_prompt_list_changed_notification(
74 &self,
75 session_id: &str,
76 notification: PromptListChangedNotification,
77 ) -> Result<(), BroadcastError>;
78
79 async fn send_cancelled_notification(
84 &self,
85 session_id: &str,
86 notification: CancelledNotification,
87 ) -> Result<(), BroadcastError>;
88
89 async fn broadcast_to_all_sessions(&self, notification: JsonRpcNotification) -> Result<Vec<String>, BroadcastError>;
93
94 async fn send_notification(
96 &self,
97 session_id: &str,
98 notification: JsonRpcNotification,
99 ) -> Result<(), BroadcastError>;
100}
101
102#[derive(Debug, thiserror::Error)]
104pub enum BroadcastError {
105 #[error("Session not found: {0}")]
106 SessionNotFound(String),
107 #[error("Broadcasting failed: {0}")]
108 BroadcastFailed(String),
109 #[error("Serialization error: {0}")]
110 SerializationError(#[from] serde_json::Error),
111}
112
113pub struct StreamManagerNotificationBroadcaster {
118 stream_manager: Arc<StreamManager>,
119}
120
121impl StreamManagerNotificationBroadcaster {
122 pub fn new(stream_manager: Arc<StreamManager>) -> Self {
124 Self { stream_manager }
125 }
126}
127
128pub mod conversion {
133 use super::*;
134 use std::collections::HashMap;
135
136 pub fn progress_to_json_rpc(notification: ProgressNotification) -> JsonRpcNotification {
137 let mut params = HashMap::new();
138 params.insert("progressToken".to_string(), serde_json::json!(notification.params.progress_token));
139 params.insert("progress".to_string(), serde_json::json!(notification.params.progress));
140 if let Some(total) = notification.params.total {
141 params.insert("total".to_string(), serde_json::json!(total));
142 }
143 if let Some(message) = notification.params.message {
144 params.insert("message".to_string(), serde_json::json!(message));
145 }
146 if let Some(meta) = notification.params.meta {
147 params.insert("_meta".to_string(), serde_json::json!(meta));
148 }
149
150 JsonRpcNotification::new_with_object_params(notification.method, params)
151 }
152
153 pub fn message_to_json_rpc(notification: LoggingMessageNotification) -> JsonRpcNotification {
154 let mut params = HashMap::new();
155 params.insert("level".to_string(), serde_json::json!(notification.params.level));
156 params.insert("data".to_string(), notification.params.data);
157 if let Some(logger) = notification.params.logger {
158 params.insert("logger".to_string(), serde_json::json!(logger));
159 }
160 if let Some(meta) = notification.params.meta {
161 params.insert("_meta".to_string(), serde_json::json!(meta));
162 }
163
164 JsonRpcNotification::new_with_object_params(notification.method, params)
165 }
166
167 pub fn resource_updated_to_json_rpc(notification: ResourceUpdatedNotification) -> JsonRpcNotification {
168 let mut params = HashMap::new();
169 params.insert("uri".to_string(), serde_json::json!(notification.params.uri));
170 if let Some(meta) = notification.params.meta {
171 params.insert("_meta".to_string(), serde_json::json!(meta));
172 }
173
174 JsonRpcNotification::new_with_object_params(notification.method, params)
175 }
176
177 pub fn resource_list_changed_to_json_rpc(notification: ResourceListChangedNotification) -> JsonRpcNotification {
178 if let Some(params) = notification.params {
179 if let Some(meta) = params.meta {
180 let mut param_map = HashMap::new();
181 param_map.insert("_meta".to_string(), serde_json::json!(meta));
182 JsonRpcNotification::new_with_object_params(notification.method, param_map)
183 } else {
184 JsonRpcNotification::new_no_params(notification.method)
185 }
186 } else {
187 JsonRpcNotification::new_no_params(notification.method)
188 }
189 }
190
191 pub fn tool_list_changed_to_json_rpc(notification: ToolListChangedNotification) -> JsonRpcNotification {
192 if let Some(params) = notification.params {
193 if let Some(meta) = params.meta {
194 let mut param_map = HashMap::new();
195 param_map.insert("_meta".to_string(), serde_json::json!(meta));
196 JsonRpcNotification::new_with_object_params(notification.method, param_map)
197 } else {
198 JsonRpcNotification::new_no_params(notification.method)
199 }
200 } else {
201 JsonRpcNotification::new_no_params(notification.method)
202 }
203 }
204
205 pub fn prompt_list_changed_to_json_rpc(notification: PromptListChangedNotification) -> JsonRpcNotification {
206 if let Some(params) = notification.params {
207 if let Some(meta) = params.meta {
208 let mut param_map = HashMap::new();
209 param_map.insert("_meta".to_string(), serde_json::json!(meta));
210 JsonRpcNotification::new_with_object_params(notification.method, param_map)
211 } else {
212 JsonRpcNotification::new_no_params(notification.method)
213 }
214 } else {
215 JsonRpcNotification::new_no_params(notification.method)
216 }
217 }
218
219 pub fn cancelled_to_json_rpc(notification: CancelledNotification) -> JsonRpcNotification {
220 let mut params = HashMap::new();
221 params.insert("requestId".to_string(), serde_json::json!(notification.params.request_id));
222 if let Some(reason) = notification.params.reason {
223 params.insert("reason".to_string(), serde_json::json!(reason));
224 }
225 if let Some(meta) = notification.params.meta {
226 params.insert("_meta".to_string(), serde_json::json!(meta));
227 }
228
229 JsonRpcNotification::new_with_object_params(notification.method, params)
230 }
231}
232
233#[async_trait]
234impl NotificationBroadcaster for StreamManagerNotificationBroadcaster {
235 async fn send_progress_notification(
238 &self,
239 session_id: &str,
240 notification: ProgressNotification,
241 ) -> Result<(), BroadcastError> {
242 let json_rpc_notification = conversion::progress_to_json_rpc(notification);
243 self.send_notification(session_id, json_rpc_notification).await
244 }
245
246 async fn send_message_notification(
247 &self,
248 session_id: &str,
249 notification: LoggingMessageNotification,
250 ) -> Result<(), BroadcastError> {
251 let json_rpc_notification = conversion::message_to_json_rpc(notification);
252 self.send_notification(session_id, json_rpc_notification).await
253 }
254
255 async fn send_resource_updated_notification(
256 &self,
257 session_id: &str,
258 notification: ResourceUpdatedNotification,
259 ) -> Result<(), BroadcastError> {
260 let json_rpc_notification = conversion::resource_updated_to_json_rpc(notification);
261 self.send_notification(session_id, json_rpc_notification).await
262 }
263
264 async fn send_resource_list_changed_notification(
265 &self,
266 session_id: &str,
267 notification: ResourceListChangedNotification,
268 ) -> Result<(), BroadcastError> {
269 let json_rpc_notification = conversion::resource_list_changed_to_json_rpc(notification);
270 self.send_notification(session_id, json_rpc_notification).await
271 }
272
273 async fn send_tool_list_changed_notification(
274 &self,
275 session_id: &str,
276 notification: ToolListChangedNotification,
277 ) -> Result<(), BroadcastError> {
278 let json_rpc_notification = conversion::tool_list_changed_to_json_rpc(notification);
279 self.send_notification(session_id, json_rpc_notification).await
280 }
281
282 async fn send_prompt_list_changed_notification(
283 &self,
284 session_id: &str,
285 notification: PromptListChangedNotification,
286 ) -> Result<(), BroadcastError> {
287 let json_rpc_notification = conversion::prompt_list_changed_to_json_rpc(notification);
288 self.send_notification(session_id, json_rpc_notification).await
289 }
290
291 async fn send_cancelled_notification(
294 &self,
295 session_id: &str,
296 notification: CancelledNotification,
297 ) -> Result<(), BroadcastError> {
298 let json_rpc_notification = conversion::cancelled_to_json_rpc(notification);
299 self.send_notification(session_id, json_rpc_notification).await
300 }
301
302 async fn broadcast_to_all_sessions(&self, notification: JsonRpcNotification) -> Result<Vec<String>, BroadcastError> {
305 let sse_data = serde_json::to_value(¬ification)
307 .map_err(|e| BroadcastError::SerializationError(e))?;
308
309 match self.stream_manager.broadcast_to_all_sessions(
311 notification.method.clone(), sse_data
313 ).await {
314 Ok(failed_sessions) => {
315 info!("📡 Broadcast JSON-RPC notification to all sessions: method={}, failed={}",
316 notification.method, failed_sessions.len());
317 Ok(failed_sessions)
318 }
319 Err(e) => {
320 error!("❌ Failed to broadcast JSON-RPC notification: method={}, error={}",
321 notification.method, e);
322 Err(BroadcastError::BroadcastFailed(e.to_string()))
323 }
324 }
325 }
326
327 async fn send_notification(
328 &self,
329 session_id: &str,
330 notification: JsonRpcNotification,
331 ) -> Result<(), BroadcastError> {
332 let sse_data = serde_json::to_value(¬ification)
334 .map_err(|e| BroadcastError::SerializationError(e))?;
335
336 match self.stream_manager.broadcast_to_session(
338 session_id,
339 notification.method.clone(), sse_data
341 ).await {
342 Ok(event_id) => {
343 info!("✅ Sent JSON-RPC notification: session={}, method={}, event_id={}",
344 session_id, notification.method, event_id);
345 Ok(())
346 }
347 Err(e) => {
348 error!("❌ Failed to send JSON-RPC notification: session={}, method={}, error={}",
349 session_id, notification.method, e);
350 Err(BroadcastError::BroadcastFailed(e.to_string()))
351 }
352 }
353 }
354}
355
356pub type SharedNotificationBroadcaster = Arc<dyn NotificationBroadcaster + Send + Sync>;