Skip to main content

adk_awp/
events.rs

1//! AWP event subscription system with HMAC-SHA256 webhook delivery.
2
3use async_trait::async_trait;
4use awp_types::AwpError;
5use chrono::{DateTime, Utc};
6use dashmap::DashMap;
7use hmac::{Hmac, Mac};
8use serde::{Deserialize, Serialize};
9use sha2::Sha256;
10use uuid::Uuid;
11
12type HmacSha256 = Hmac<Sha256>;
13
14/// An AWP event to be delivered to subscribers.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16#[serde(rename_all = "camelCase")]
17pub struct AwpEvent {
18    /// Unique event identifier.
19    pub id: Uuid,
20    /// Event type (e.g. `"health.changed"`, `"consent.captured"`).
21    pub event_type: String,
22    /// When the event occurred.
23    pub timestamp: DateTime<Utc>,
24    /// Event payload.
25    pub payload: serde_json::Value,
26}
27
28/// A webhook subscription for AWP events.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct EventSubscription {
32    /// Unique subscription identifier.
33    pub id: Uuid,
34    /// Human-readable subscriber name.
35    pub subscriber: String,
36    /// URL to POST events to.
37    pub callback_url: String,
38    /// Event types this subscription listens for.
39    pub event_types: Vec<String>,
40    /// Shared secret for HMAC-SHA256 signing.
41    pub secret: String,
42}
43
44/// Trait for managing event subscriptions and delivering events.
45#[async_trait]
46pub trait EventSubscriptionService: Send + Sync {
47    /// Create a new subscription, returning its ID.
48    async fn create(&self, subscription: EventSubscription) -> Result<Uuid, AwpError>;
49
50    /// Get a subscription by ID.
51    async fn get(&self, id: Uuid) -> Result<Option<EventSubscription>, AwpError>;
52
53    /// List all subscriptions.
54    async fn list(&self) -> Result<Vec<EventSubscription>, AwpError>;
55
56    /// Delete a subscription by ID.
57    async fn delete(&self, id: Uuid) -> Result<(), AwpError>;
58
59    /// Deliver an event to all matching subscribers.
60    async fn deliver(&self, event: AwpEvent) -> Result<(), AwpError>;
61}
62
63/// In-memory event subscription service backed by [`DashMap`].
64///
65/// Webhook delivery is logged but not actually performed (no HTTP client).
66/// Enable the `webhook-delivery` feature for real HTTP delivery.
67pub struct InMemoryEventSubscriptionService {
68    subscriptions: DashMap<Uuid, EventSubscription>,
69}
70
71impl InMemoryEventSubscriptionService {
72    /// Create a new empty event subscription service.
73    pub fn new() -> Self {
74        Self { subscriptions: DashMap::new() }
75    }
76}
77
78impl Default for InMemoryEventSubscriptionService {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84#[async_trait]
85impl EventSubscriptionService for InMemoryEventSubscriptionService {
86    async fn create(&self, subscription: EventSubscription) -> Result<Uuid, AwpError> {
87        let id = subscription.id;
88        self.subscriptions.insert(id, subscription);
89        Ok(id)
90    }
91
92    async fn get(&self, id: Uuid) -> Result<Option<EventSubscription>, AwpError> {
93        Ok(self.subscriptions.get(&id).map(|e| e.value().clone()))
94    }
95
96    async fn list(&self) -> Result<Vec<EventSubscription>, AwpError> {
97        Ok(self.subscriptions.iter().map(|e| e.value().clone()).collect())
98    }
99
100    async fn delete(&self, id: Uuid) -> Result<(), AwpError> {
101        self.subscriptions.remove(&id);
102        Ok(())
103    }
104
105    async fn deliver(&self, event: AwpEvent) -> Result<(), AwpError> {
106        let payload = serde_json::to_vec(&event)
107            .map_err(|e| AwpError::InternalError(format!("event serialization failed: {e}")))?;
108
109        for entry in self.subscriptions.iter() {
110            let sub = entry.value();
111            // Check if subscription matches this event type
112            if !sub.event_types.is_empty()
113                && !sub.event_types.iter().any(|t| t == &event.event_type || t == "*")
114            {
115                continue;
116            }
117
118            let signature = sign_payload(&payload, &sub.secret);
119            tracing::info!(
120                subscriber = %sub.subscriber,
121                callback_url = %sub.callback_url,
122                event_type = %event.event_type,
123                signature = %signature,
124                "would deliver webhook (in-memory mode)"
125            );
126        }
127
128        Ok(())
129    }
130}
131
132/// Compute HMAC-SHA256 of a payload with the given secret.
133///
134/// Returns the signature in `sha256={hex_digest}` format.
135pub fn sign_payload(payload: &[u8], secret: &str) -> String {
136    let mut mac =
137        HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC accepts any key length");
138    mac.update(payload);
139    let result = mac.finalize();
140    format!("sha256={}", hex::encode(result.into_bytes()))
141}
142
143/// Verify an HMAC-SHA256 signature against a payload and secret.
144///
145/// The `signature` should be in `sha256={hex_digest}` format.
146pub fn verify_signature(payload: &[u8], secret: &str, signature: &str) -> bool {
147    // Constant-time comparison via the hmac crate
148    let mut mac =
149        HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC accepts any key length");
150    mac.update(payload);
151
152    if let Some(hex_sig) = signature.strip_prefix("sha256=") {
153        if let Ok(sig_bytes) = hex::decode(hex_sig) {
154            return mac.verify_slice(&sig_bytes).is_ok();
155        }
156    }
157    false
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    fn sample_subscription(event_types: Vec<String>) -> EventSubscription {
165        EventSubscription {
166            id: Uuid::now_v7(),
167            subscriber: "test-subscriber".to_string(),
168            callback_url: "https://example.com/webhook".to_string(),
169            event_types,
170            secret: "test-secret-key".to_string(),
171        }
172    }
173
174    fn sample_event() -> AwpEvent {
175        AwpEvent {
176            id: Uuid::now_v7(),
177            event_type: "health.changed".to_string(),
178            timestamp: Utc::now(),
179            payload: serde_json::json!({"state": "degrading"}),
180        }
181    }
182
183    #[tokio::test]
184    async fn test_create_and_get() {
185        let svc = InMemoryEventSubscriptionService::new();
186        let sub = sample_subscription(vec!["health.changed".to_string()]);
187        let id = sub.id;
188        svc.create(sub).await.unwrap();
189
190        let retrieved = svc.get(id).await.unwrap().unwrap();
191        assert_eq!(retrieved.subscriber, "test-subscriber");
192    }
193
194    #[tokio::test]
195    async fn test_get_nonexistent() {
196        let svc = InMemoryEventSubscriptionService::new();
197        assert!(svc.get(Uuid::now_v7()).await.unwrap().is_none());
198    }
199
200    #[tokio::test]
201    async fn test_list() {
202        let svc = InMemoryEventSubscriptionService::new();
203        svc.create(sample_subscription(vec![])).await.unwrap();
204        svc.create(sample_subscription(vec![])).await.unwrap();
205
206        let list = svc.list().await.unwrap();
207        assert_eq!(list.len(), 2);
208    }
209
210    #[tokio::test]
211    async fn test_delete() {
212        let svc = InMemoryEventSubscriptionService::new();
213        let sub = sample_subscription(vec![]);
214        let id = sub.id;
215        svc.create(sub).await.unwrap();
216        assert!(svc.get(id).await.unwrap().is_some());
217
218        svc.delete(id).await.unwrap();
219        assert!(svc.get(id).await.unwrap().is_none());
220    }
221
222    #[tokio::test]
223    async fn test_deliver_matching_event() {
224        let svc = InMemoryEventSubscriptionService::new();
225        svc.create(sample_subscription(vec!["health.changed".to_string()])).await.unwrap();
226        // Should not error — just logs
227        svc.deliver(sample_event()).await.unwrap();
228    }
229
230    #[tokio::test]
231    async fn test_deliver_wildcard_subscription() {
232        let svc = InMemoryEventSubscriptionService::new();
233        svc.create(sample_subscription(vec!["*".to_string()])).await.unwrap();
234        svc.deliver(sample_event()).await.unwrap();
235    }
236
237    #[tokio::test]
238    async fn test_deliver_non_matching_event() {
239        let svc = InMemoryEventSubscriptionService::new();
240        svc.create(sample_subscription(vec!["consent.captured".to_string()])).await.unwrap();
241        // health.changed doesn't match consent.captured — should still succeed (no delivery)
242        svc.deliver(sample_event()).await.unwrap();
243    }
244
245    #[test]
246    fn test_sign_payload() {
247        let payload = b"hello world";
248        let sig = sign_payload(payload, "secret");
249        assert!(sig.starts_with("sha256="));
250        assert_eq!(sig.len(), 7 + 64); // "sha256=" + 64 hex chars
251    }
252
253    #[test]
254    fn test_verify_signature_valid() {
255        let payload = b"test payload";
256        let secret = "my-secret";
257        let sig = sign_payload(payload, secret);
258        assert!(verify_signature(payload, secret, &sig));
259    }
260
261    #[test]
262    fn test_verify_signature_wrong_payload() {
263        let secret = "my-secret";
264        let sig = sign_payload(b"original", secret);
265        assert!(!verify_signature(b"tampered", secret, &sig));
266    }
267
268    #[test]
269    fn test_verify_signature_wrong_secret() {
270        let payload = b"test payload";
271        let sig = sign_payload(payload, "secret1");
272        assert!(!verify_signature(payload, "secret2", &sig));
273    }
274
275    #[test]
276    fn test_verify_signature_invalid_format() {
277        assert!(!verify_signature(b"payload", "secret", "not-a-signature"));
278    }
279
280    #[test]
281    fn test_verify_signature_invalid_hex() {
282        assert!(!verify_signature(b"payload", "secret", "sha256=not-hex"));
283    }
284
285    #[test]
286    fn test_event_serialization_round_trip() {
287        let event = sample_event();
288        let json = serde_json::to_string(&event).unwrap();
289        let parsed: AwpEvent = serde_json::from_str(&json).unwrap();
290        assert_eq!(event.id, parsed.id);
291        assert_eq!(event.event_type, parsed.event_type);
292    }
293
294    #[test]
295    fn test_subscription_serialization_round_trip() {
296        let sub = sample_subscription(vec!["health.changed".to_string()]);
297        let json = serde_json::to_string(&sub).unwrap();
298        let parsed: EventSubscription = serde_json::from_str(&json).unwrap();
299        assert_eq!(sub.id, parsed.id);
300        assert_eq!(sub.subscriber, parsed.subscriber);
301    }
302}