turul_mcp_server/
notifications.rs1use async_trait::async_trait;
6use serde_json::Value;
7use turul_mcp_builders::prelude::*;
8use turul_mcp_protocol::{McpResult, notifications::Notification};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum DeliveryStatus {
13 Pending,
14 Sent,
15 Acknowledged,
16 Failed,
17 Retrying,
18}
19
20#[derive(Debug, Clone)]
22pub struct DeliveryResult {
23 pub status: DeliveryStatus,
24 pub attempts: u32,
25 pub error: Option<String>,
26 pub delivered_at: Option<u64>, }
28
29#[async_trait]
35pub trait McpNotification: NotificationDefinition + Send + Sync {
36 async fn send(&self, payload: Value) -> McpResult<DeliveryResult>;
41
42 fn can_send(&self, method: &str) -> bool {
47 method == self.method()
48 }
49
50 fn priority(&self) -> u32 {
55 0
56 }
57
58 async fn validate_payload(&self, payload: &Value) -> McpResult<()> {
62 if payload.is_null() && self.requires_ack() {
64 return Err(turul_mcp_protocol::McpError::validation(
65 "Payload cannot be null for notifications requiring acknowledgment",
66 ));
67 }
68 Ok(())
69 }
70
71 async fn transform_payload(&self, payload: Value) -> McpResult<Value> {
76 Ok(payload)
77 }
78
79 async fn handle_error(
84 &self,
85 _error: &turul_mcp_protocol::McpError,
86 attempt: u32,
87 ) -> McpResult<bool> {
88 Ok(attempt < self.max_retries())
90 }
91
92 async fn batch_send(&self, payloads: Vec<Value>) -> McpResult<Vec<DeliveryResult>> {
97 let mut results = Vec::new();
99 for payload in payloads {
100 results.push(self.send(payload).await?);
101 }
102 Ok(results)
103 }
104
105 async fn on_acknowledged(&self, _delivery_result: &DeliveryResult) -> McpResult<()> {
110 Ok(())
112 }
113
114 async fn check_status(&self, _notification_id: &str) -> McpResult<DeliveryStatus> {
118 Ok(DeliveryStatus::Sent)
120 }
121}
122
123pub fn notification_to_protocol(
128 notification: &dyn McpNotification,
129 payload: Value,
130) -> Notification {
131 let mut protocol_notification = notification.to_notification();
132 if protocol_notification.params.is_none() {
134 use turul_mcp_protocol::notifications::NotificationParams;
135 let mut params = NotificationParams::new();
136 if let Ok(obj) = serde_json::from_value::<std::collections::HashMap<String, Value>>(payload)
137 {
138 params.other = obj;
139 }
140 protocol_notification.params = Some(params);
141 }
142 protocol_notification
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use serde_json::json;
149 struct TestNotification {
152 method: String,
153 payload: Option<Value>,
154 priority: u32,
155 }
156
157 impl HasNotificationMetadata for TestNotification {
159 fn method(&self) -> &str {
160 &self.method
161 }
162
163 fn requires_ack(&self) -> bool {
164 self.method.contains("important")
165 }
166 }
167
168 impl HasNotificationPayload for TestNotification {
169 fn payload(&self) -> Option<Value> {
170 self.payload.clone()
171 }
172 }
173
174 impl HasNotificationRules for TestNotification {
175 fn priority(&self) -> u32 {
176 self.priority
177 }
178
179 fn can_batch(&self) -> bool {
180 !self.method.contains("urgent")
181 }
182
183 fn max_retries(&self) -> u32 {
184 if self.method.contains("critical") {
185 5
186 } else {
187 3
188 }
189 }
190 }
191
192 #[async_trait]
195 impl McpNotification for TestNotification {
196 async fn send(&self, payload: Value) -> McpResult<DeliveryResult> {
197 println!(
199 "Sending notification: {} with payload: {}",
200 self.method, payload
201 );
202
203 Ok(DeliveryResult {
204 status: DeliveryStatus::Sent,
205 attempts: 1,
206 error: None,
207 delivered_at: Some(
208 std::time::SystemTime::now()
209 .duration_since(std::time::UNIX_EPOCH)
210 .unwrap()
211 .as_secs(),
212 ),
213 })
214 }
215 }
216
217 #[test]
218 fn test_notification_trait() {
219 let notification = TestNotification {
220 method: "notifications/test".to_string(),
221 payload: Some(json!({"data": "test"})),
222 priority: 5,
223 };
224
225 assert_eq!(notification.method(), "notifications/test");
226 assert_eq!(HasNotificationRules::priority(¬ification), 5);
227 assert!(!notification.requires_ack());
228 assert!(notification.can_batch());
229 assert_eq!(notification.max_retries(), 3);
230 }
231
232 #[tokio::test]
233 async fn test_notification_validation() {
234 let notification = TestNotification {
235 method: "notifications/important/test".to_string(),
236 payload: None,
237 priority: 0,
238 };
239
240 let result = notification.validate_payload(&Value::Null).await;
241 assert!(result.is_err()); let valid_result = notification.validate_payload(&json!({"valid": true})).await;
244 assert!(valid_result.is_ok());
245 }
246
247 #[tokio::test]
248 async fn test_notification_sending() {
249 let notification = TestNotification {
250 method: "notifications/test".to_string(),
251 payload: Some(json!({"test": true})),
252 priority: 1,
253 };
254
255 let payload = json!({"message": "test notification"});
256 let result = notification.send(payload).await.unwrap();
257
258 assert_eq!(result.status, DeliveryStatus::Sent);
259 assert_eq!(result.attempts, 1);
260 assert!(result.error.is_none());
261 assert!(result.delivered_at.is_some());
262 }
263
264 #[tokio::test]
265 async fn test_batch_sending() {
266 let notification = TestNotification {
267 method: "notifications/batch_test".to_string(),
268 payload: None,
269 priority: 2,
270 };
271
272 let payloads = vec![
273 json!({"id": 1, "data": "first"}),
274 json!({"id": 2, "data": "second"}),
275 json!({"id": 3, "data": "third"}),
276 ];
277
278 let results = notification.batch_send(payloads).await.unwrap();
279 assert_eq!(results.len(), 3);
280
281 for result in results {
282 assert_eq!(result.status, DeliveryStatus::Sent);
283 assert_eq!(result.attempts, 1);
284 }
285 }
286
287 #[tokio::test]
288 async fn test_delivery_status() {
289 let notification = TestNotification {
290 method: "notifications/status_test".to_string(),
291 payload: None,
292 priority: 0,
293 };
294
295 let status = notification
296 .check_status("test-notification-123")
297 .await
298 .unwrap();
299 assert_eq!(status, DeliveryStatus::Sent);
300 }
301}