1use async_trait::async_trait;
13use std::sync::Arc;
14use tracing::{error, info};
15
16use crate::StreamManager;
17use turul_mcp_json_rpc_server::JsonRpcNotification;
18use turul_mcp_protocol::notifications::{
19 CancelledNotification, LoggingMessageNotification, ProgressNotification,
20 PromptListChangedNotification, ResourceListChangedNotification, ResourceUpdatedNotification,
21 ToolListChangedNotification,
22};
23
24#[async_trait]
28pub trait NotificationBroadcaster: Send + Sync {
29 async fn send_progress_notification(
34 &self,
35 session_id: &str,
36 notification: ProgressNotification,
37 ) -> Result<(), BroadcastError>;
38
39 async fn send_message_notification(
42 &self,
43 session_id: &str,
44 notification: LoggingMessageNotification,
45 ) -> Result<(), BroadcastError>;
46
47 async fn send_resource_updated_notification(
50 &self,
51 session_id: &str,
52 notification: ResourceUpdatedNotification,
53 ) -> Result<(), BroadcastError>;
54
55 async fn send_resource_list_changed_notification(
58 &self,
59 session_id: &str,
60 notification: ResourceListChangedNotification,
61 ) -> Result<(), BroadcastError>;
62
63 async fn send_tool_list_changed_notification(
66 &self,
67 session_id: &str,
68 notification: ToolListChangedNotification,
69 ) -> Result<(), BroadcastError>;
70
71 async fn send_prompt_list_changed_notification(
74 &self,
75 session_id: &str,
76 notification: PromptListChangedNotification,
77 ) -> Result<(), BroadcastError>;
78
79 async fn send_cancelled_notification(
84 &self,
85 session_id: &str,
86 notification: CancelledNotification,
87 ) -> Result<(), BroadcastError>;
88
89 async fn broadcast_to_all_sessions(
93 &self,
94 notification: JsonRpcNotification,
95 ) -> Result<Vec<String>, BroadcastError>;
96
97 async fn send_notification(
99 &self,
100 session_id: &str,
101 notification: JsonRpcNotification,
102 ) -> Result<(), BroadcastError>;
103}
104
105#[derive(Debug, thiserror::Error)]
107pub enum BroadcastError {
108 #[error("Session not found: {0}")]
109 SessionNotFound(String),
110 #[error("Broadcasting failed: {0}")]
111 BroadcastFailed(String),
112 #[error("Serialization error: {0}")]
113 SerializationError(#[from] serde_json::Error),
114}
115
116pub struct StreamManagerNotificationBroadcaster {
121 stream_manager: Arc<StreamManager>,
122}
123
124impl StreamManagerNotificationBroadcaster {
125 pub fn new(stream_manager: Arc<StreamManager>) -> Self {
127 Self { stream_manager }
128 }
129}
130
131pub mod conversion {
136 use super::*;
137 use std::collections::HashMap;
138
139 pub fn progress_to_json_rpc(notification: ProgressNotification) -> JsonRpcNotification {
140 let mut params = HashMap::new();
141 params.insert(
142 "progressToken".to_string(),
143 serde_json::json!(notification.params.progress_token),
144 );
145 params.insert(
146 "progress".to_string(),
147 serde_json::json!(notification.params.progress),
148 );
149 if let Some(total) = notification.params.total {
150 params.insert("total".to_string(), serde_json::json!(total));
151 }
152 if let Some(message) = notification.params.message {
153 params.insert("message".to_string(), serde_json::json!(message));
154 }
155 if let Some(meta) = notification.params.meta {
156 params.insert("_meta".to_string(), serde_json::json!(meta));
157 }
158
159 JsonRpcNotification::new_with_object_params(notification.method, params)
160 }
161
162 pub fn message_to_json_rpc(notification: LoggingMessageNotification) -> JsonRpcNotification {
163 let mut params = HashMap::new();
164 params.insert(
165 "level".to_string(),
166 serde_json::json!(notification.params.level),
167 );
168 params.insert("data".to_string(), notification.params.data);
169 if let Some(logger) = notification.params.logger {
170 params.insert("logger".to_string(), serde_json::json!(logger));
171 }
172 if let Some(meta) = notification.params.meta {
173 params.insert("_meta".to_string(), serde_json::json!(meta));
174 }
175
176 JsonRpcNotification::new_with_object_params(notification.method, params)
177 }
178
179 pub fn resource_updated_to_json_rpc(
180 notification: ResourceUpdatedNotification,
181 ) -> JsonRpcNotification {
182 let mut params = HashMap::new();
183 params.insert(
184 "uri".to_string(),
185 serde_json::json!(notification.params.uri),
186 );
187 if let Some(meta) = notification.params.meta {
188 params.insert("_meta".to_string(), serde_json::json!(meta));
189 }
190
191 JsonRpcNotification::new_with_object_params(notification.method, params)
192 }
193
194 pub fn resource_list_changed_to_json_rpc(
195 notification: ResourceListChangedNotification,
196 ) -> JsonRpcNotification {
197 if let Some(params) = notification.params {
198 if let Some(meta) = params.meta {
199 let mut param_map = HashMap::new();
200 param_map.insert("_meta".to_string(), serde_json::json!(meta));
201 JsonRpcNotification::new_with_object_params(notification.method, param_map)
202 } else {
203 JsonRpcNotification::new_no_params(notification.method)
204 }
205 } else {
206 JsonRpcNotification::new_no_params(notification.method)
207 }
208 }
209
210 pub fn tool_list_changed_to_json_rpc(
211 notification: ToolListChangedNotification,
212 ) -> JsonRpcNotification {
213 if let Some(params) = notification.params {
214 if let Some(meta) = params.meta {
215 let mut param_map = HashMap::new();
216 param_map.insert("_meta".to_string(), serde_json::json!(meta));
217 JsonRpcNotification::new_with_object_params(notification.method, param_map)
218 } else {
219 JsonRpcNotification::new_no_params(notification.method)
220 }
221 } else {
222 JsonRpcNotification::new_no_params(notification.method)
223 }
224 }
225
226 pub fn prompt_list_changed_to_json_rpc(
227 notification: PromptListChangedNotification,
228 ) -> JsonRpcNotification {
229 if let Some(params) = notification.params {
230 if let Some(meta) = params.meta {
231 let mut param_map = HashMap::new();
232 param_map.insert("_meta".to_string(), serde_json::json!(meta));
233 JsonRpcNotification::new_with_object_params(notification.method, param_map)
234 } else {
235 JsonRpcNotification::new_no_params(notification.method)
236 }
237 } else {
238 JsonRpcNotification::new_no_params(notification.method)
239 }
240 }
241
242 pub fn cancelled_to_json_rpc(notification: CancelledNotification) -> JsonRpcNotification {
243 let mut params = HashMap::new();
244 params.insert(
245 "requestId".to_string(),
246 serde_json::json!(notification.params.request_id),
247 );
248 if let Some(reason) = notification.params.reason {
249 params.insert("reason".to_string(), serde_json::json!(reason));
250 }
251 if let Some(meta) = notification.params.meta {
252 params.insert("_meta".to_string(), serde_json::json!(meta));
253 }
254
255 JsonRpcNotification::new_with_object_params(notification.method, params)
256 }
257}
258
259#[async_trait]
260impl NotificationBroadcaster for StreamManagerNotificationBroadcaster {
261 async fn send_progress_notification(
264 &self,
265 session_id: &str,
266 notification: ProgressNotification,
267 ) -> Result<(), BroadcastError> {
268 let json_rpc_notification = conversion::progress_to_json_rpc(notification);
269 self.send_notification(session_id, json_rpc_notification)
270 .await
271 }
272
273 async fn send_message_notification(
274 &self,
275 session_id: &str,
276 notification: LoggingMessageNotification,
277 ) -> Result<(), BroadcastError> {
278 let json_rpc_notification = conversion::message_to_json_rpc(notification);
279 self.send_notification(session_id, json_rpc_notification)
280 .await
281 }
282
283 async fn send_resource_updated_notification(
284 &self,
285 session_id: &str,
286 notification: ResourceUpdatedNotification,
287 ) -> Result<(), BroadcastError> {
288 let json_rpc_notification = conversion::resource_updated_to_json_rpc(notification);
289 self.send_notification(session_id, json_rpc_notification)
290 .await
291 }
292
293 async fn send_resource_list_changed_notification(
294 &self,
295 session_id: &str,
296 notification: ResourceListChangedNotification,
297 ) -> Result<(), BroadcastError> {
298 let json_rpc_notification = conversion::resource_list_changed_to_json_rpc(notification);
299 self.send_notification(session_id, json_rpc_notification)
300 .await
301 }
302
303 async fn send_tool_list_changed_notification(
304 &self,
305 session_id: &str,
306 notification: ToolListChangedNotification,
307 ) -> Result<(), BroadcastError> {
308 let json_rpc_notification = conversion::tool_list_changed_to_json_rpc(notification);
309 self.send_notification(session_id, json_rpc_notification)
310 .await
311 }
312
313 async fn send_prompt_list_changed_notification(
314 &self,
315 session_id: &str,
316 notification: PromptListChangedNotification,
317 ) -> Result<(), BroadcastError> {
318 let json_rpc_notification = conversion::prompt_list_changed_to_json_rpc(notification);
319 self.send_notification(session_id, json_rpc_notification)
320 .await
321 }
322
323 async fn send_cancelled_notification(
326 &self,
327 session_id: &str,
328 notification: CancelledNotification,
329 ) -> Result<(), BroadcastError> {
330 let json_rpc_notification = conversion::cancelled_to_json_rpc(notification);
331 self.send_notification(session_id, json_rpc_notification)
332 .await
333 }
334
335 async fn broadcast_to_all_sessions(
338 &self,
339 notification: JsonRpcNotification,
340 ) -> Result<Vec<String>, BroadcastError> {
341 let sse_data =
343 serde_json::to_value(¬ification).map_err(BroadcastError::SerializationError)?;
344
345 match self
347 .stream_manager
348 .broadcast_to_all_sessions(
349 notification.method.clone(), sse_data,
351 )
352 .await
353 {
354 Ok(failed_sessions) => {
355 info!(
356 "📡 Broadcast JSON-RPC notification to all sessions: method={}, failed={}",
357 notification.method,
358 failed_sessions.len()
359 );
360 Ok(failed_sessions)
361 }
362 Err(e) => {
363 error!(
364 "❌ Failed to broadcast JSON-RPC notification: method={}, error={}",
365 notification.method, e
366 );
367 Err(BroadcastError::BroadcastFailed(e.to_string()))
368 }
369 }
370 }
371
372 async fn send_notification(
373 &self,
374 session_id: &str,
375 notification: JsonRpcNotification,
376 ) -> Result<(), BroadcastError> {
377 let sse_data =
379 serde_json::to_value(¬ification).map_err(BroadcastError::SerializationError)?;
380
381 match self
383 .stream_manager
384 .broadcast_to_session(
385 session_id,
386 notification.method.clone(), sse_data,
388 )
389 .await
390 {
391 Ok(event_id) => {
392 info!(
393 "✅ Sent JSON-RPC notification: session={}, method={}, event_id={}",
394 session_id, notification.method, event_id
395 );
396 Ok(())
397 }
398 Err(e) => {
399 error!(
400 "❌ Failed to send JSON-RPC notification: session={}, method={}, error={}",
401 session_id, notification.method, e
402 );
403 Err(BroadcastError::BroadcastFailed(e.to_string()))
404 }
405 }
406 }
407}
408
409pub type SharedNotificationBroadcaster = Arc<dyn NotificationBroadcaster + Send + Sync>;