1use async_trait::async_trait;
4use awp_types::AwpError;
5use chrono::{DateTime, Utc};
6use dashmap::DashMap;
7use hmac::{Hmac, Mac};
8use serde::{Deserialize, Serialize};
9use sha2::Sha256;
10use uuid::Uuid;
11
12type HmacSha256 = Hmac<Sha256>;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16#[serde(rename_all = "camelCase")]
17pub struct AwpEvent {
18 pub id: Uuid,
20 pub event_type: String,
22 pub timestamp: DateTime<Utc>,
24 pub payload: serde_json::Value,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct EventSubscription {
32 pub id: Uuid,
34 pub subscriber: String,
36 pub callback_url: String,
38 pub event_types: Vec<String>,
40 pub secret: String,
42}
43
44#[async_trait]
46pub trait EventSubscriptionService: Send + Sync {
47 async fn create(&self, subscription: EventSubscription) -> Result<Uuid, AwpError>;
49
50 async fn get(&self, id: Uuid) -> Result<Option<EventSubscription>, AwpError>;
52
53 async fn list(&self) -> Result<Vec<EventSubscription>, AwpError>;
55
56 async fn delete(&self, id: Uuid) -> Result<(), AwpError>;
58
59 async fn deliver(&self, event: AwpEvent) -> Result<(), AwpError>;
61}
62
63pub struct InMemoryEventSubscriptionService {
68 subscriptions: DashMap<Uuid, EventSubscription>,
69}
70
71impl InMemoryEventSubscriptionService {
72 pub fn new() -> Self {
74 Self { subscriptions: DashMap::new() }
75 }
76}
77
78impl Default for InMemoryEventSubscriptionService {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84#[async_trait]
85impl EventSubscriptionService for InMemoryEventSubscriptionService {
86 async fn create(&self, subscription: EventSubscription) -> Result<Uuid, AwpError> {
87 let id = subscription.id;
88 self.subscriptions.insert(id, subscription);
89 Ok(id)
90 }
91
92 async fn get(&self, id: Uuid) -> Result<Option<EventSubscription>, AwpError> {
93 Ok(self.subscriptions.get(&id).map(|e| e.value().clone()))
94 }
95
96 async fn list(&self) -> Result<Vec<EventSubscription>, AwpError> {
97 Ok(self.subscriptions.iter().map(|e| e.value().clone()).collect())
98 }
99
100 async fn delete(&self, id: Uuid) -> Result<(), AwpError> {
101 self.subscriptions.remove(&id);
102 Ok(())
103 }
104
105 async fn deliver(&self, event: AwpEvent) -> Result<(), AwpError> {
106 let payload = serde_json::to_vec(&event)
107 .map_err(|e| AwpError::InternalError(format!("event serialization failed: {e}")))?;
108
109 for entry in self.subscriptions.iter() {
110 let sub = entry.value();
111 if !sub.event_types.is_empty()
113 && !sub.event_types.iter().any(|t| t == &event.event_type || t == "*")
114 {
115 continue;
116 }
117
118 let signature = sign_payload(&payload, &sub.secret);
119 tracing::info!(
120 subscriber = %sub.subscriber,
121 callback_url = %sub.callback_url,
122 event_type = %event.event_type,
123 signature = %signature,
124 "would deliver webhook (in-memory mode)"
125 );
126 }
127
128 Ok(())
129 }
130}
131
132pub fn sign_payload(payload: &[u8], secret: &str) -> String {
136 let mut mac =
137 HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC accepts any key length");
138 mac.update(payload);
139 let result = mac.finalize();
140 format!("sha256={}", hex::encode(result.into_bytes()))
141}
142
143pub fn verify_signature(payload: &[u8], secret: &str, signature: &str) -> bool {
147 let mut mac =
149 HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC accepts any key length");
150 mac.update(payload);
151
152 if let Some(hex_sig) = signature.strip_prefix("sha256=") {
153 if let Ok(sig_bytes) = hex::decode(hex_sig) {
154 return mac.verify_slice(&sig_bytes).is_ok();
155 }
156 }
157 false
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 fn sample_subscription(event_types: Vec<String>) -> EventSubscription {
165 EventSubscription {
166 id: Uuid::now_v7(),
167 subscriber: "test-subscriber".to_string(),
168 callback_url: "https://example.com/webhook".to_string(),
169 event_types,
170 secret: "test-secret-key".to_string(),
171 }
172 }
173
174 fn sample_event() -> AwpEvent {
175 AwpEvent {
176 id: Uuid::now_v7(),
177 event_type: "health.changed".to_string(),
178 timestamp: Utc::now(),
179 payload: serde_json::json!({"state": "degrading"}),
180 }
181 }
182
183 #[tokio::test]
184 async fn test_create_and_get() {
185 let svc = InMemoryEventSubscriptionService::new();
186 let sub = sample_subscription(vec!["health.changed".to_string()]);
187 let id = sub.id;
188 svc.create(sub).await.unwrap();
189
190 let retrieved = svc.get(id).await.unwrap().unwrap();
191 assert_eq!(retrieved.subscriber, "test-subscriber");
192 }
193
194 #[tokio::test]
195 async fn test_get_nonexistent() {
196 let svc = InMemoryEventSubscriptionService::new();
197 assert!(svc.get(Uuid::now_v7()).await.unwrap().is_none());
198 }
199
200 #[tokio::test]
201 async fn test_list() {
202 let svc = InMemoryEventSubscriptionService::new();
203 svc.create(sample_subscription(vec![])).await.unwrap();
204 svc.create(sample_subscription(vec![])).await.unwrap();
205
206 let list = svc.list().await.unwrap();
207 assert_eq!(list.len(), 2);
208 }
209
210 #[tokio::test]
211 async fn test_delete() {
212 let svc = InMemoryEventSubscriptionService::new();
213 let sub = sample_subscription(vec![]);
214 let id = sub.id;
215 svc.create(sub).await.unwrap();
216 assert!(svc.get(id).await.unwrap().is_some());
217
218 svc.delete(id).await.unwrap();
219 assert!(svc.get(id).await.unwrap().is_none());
220 }
221
222 #[tokio::test]
223 async fn test_deliver_matching_event() {
224 let svc = InMemoryEventSubscriptionService::new();
225 svc.create(sample_subscription(vec!["health.changed".to_string()])).await.unwrap();
226 svc.deliver(sample_event()).await.unwrap();
228 }
229
230 #[tokio::test]
231 async fn test_deliver_wildcard_subscription() {
232 let svc = InMemoryEventSubscriptionService::new();
233 svc.create(sample_subscription(vec!["*".to_string()])).await.unwrap();
234 svc.deliver(sample_event()).await.unwrap();
235 }
236
237 #[tokio::test]
238 async fn test_deliver_non_matching_event() {
239 let svc = InMemoryEventSubscriptionService::new();
240 svc.create(sample_subscription(vec!["consent.captured".to_string()])).await.unwrap();
241 svc.deliver(sample_event()).await.unwrap();
243 }
244
245 #[test]
246 fn test_sign_payload() {
247 let payload = b"hello world";
248 let sig = sign_payload(payload, "secret");
249 assert!(sig.starts_with("sha256="));
250 assert_eq!(sig.len(), 7 + 64); }
252
253 #[test]
254 fn test_verify_signature_valid() {
255 let payload = b"test payload";
256 let secret = "my-secret";
257 let sig = sign_payload(payload, secret);
258 assert!(verify_signature(payload, secret, &sig));
259 }
260
261 #[test]
262 fn test_verify_signature_wrong_payload() {
263 let secret = "my-secret";
264 let sig = sign_payload(b"original", secret);
265 assert!(!verify_signature(b"tampered", secret, &sig));
266 }
267
268 #[test]
269 fn test_verify_signature_wrong_secret() {
270 let payload = b"test payload";
271 let sig = sign_payload(payload, "secret1");
272 assert!(!verify_signature(payload, "secret2", &sig));
273 }
274
275 #[test]
276 fn test_verify_signature_invalid_format() {
277 assert!(!verify_signature(b"payload", "secret", "not-a-signature"));
278 }
279
280 #[test]
281 fn test_verify_signature_invalid_hex() {
282 assert!(!verify_signature(b"payload", "secret", "sha256=not-hex"));
283 }
284
285 #[test]
286 fn test_event_serialization_round_trip() {
287 let event = sample_event();
288 let json = serde_json::to_string(&event).unwrap();
289 let parsed: AwpEvent = serde_json::from_str(&json).unwrap();
290 assert_eq!(event.id, parsed.id);
291 assert_eq!(event.event_type, parsed.event_type);
292 }
293
294 #[test]
295 fn test_subscription_serialization_round_trip() {
296 let sub = sample_subscription(vec!["health.changed".to_string()]);
297 let json = serde_json::to_string(&sub).unwrap();
298 let parsed: EventSubscription = serde_json::from_str(&json).unwrap();
299 assert_eq!(sub.id, parsed.id);
300 assert_eq!(sub.subscriber, parsed.subscriber);
301 }
302}