Skip to main content

allsource_core/application/services/
webhook.rs

1use crate::domain::entities::Event;
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7/// A registered webhook subscription
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct WebhookSubscription {
10    /// Unique ID for this subscription
11    pub id: Uuid,
12    /// Tenant that owns this webhook
13    pub tenant_id: String,
14    /// URL to deliver events to
15    pub url: String,
16    /// Optional: only deliver events matching these event types (glob patterns)
17    /// If empty, all events are delivered
18    pub event_types: Vec<String>,
19    /// Optional: only deliver events matching these entity IDs
20    pub entity_ids: Vec<String>,
21    /// Secret used to sign webhook payloads (HMAC-SHA256)
22    pub secret: String,
23    /// Whether this webhook is currently active
24    pub active: bool,
25    /// When this webhook was created
26    pub created_at: DateTime<Utc>,
27    /// When this webhook was last updated
28    pub updated_at: DateTime<Utc>,
29    /// Optional description
30    pub description: Option<String>,
31}
32
33/// Request to register a new webhook
34#[derive(Debug, Clone, Deserialize)]
35pub struct RegisterWebhookRequest {
36    pub tenant_id: String,
37    pub url: String,
38    #[serde(default)]
39    pub event_types: Vec<String>,
40    #[serde(default)]
41    pub entity_ids: Vec<String>,
42    pub secret: Option<String>,
43    pub description: Option<String>,
44}
45
46/// Request to update an existing webhook
47#[derive(Debug, Clone, Deserialize)]
48pub struct UpdateWebhookRequest {
49    pub url: Option<String>,
50    pub event_types: Option<Vec<String>>,
51    pub entity_ids: Option<Vec<String>>,
52    pub active: Option<bool>,
53    pub description: Option<String>,
54}
55
56/// A webhook delivery attempt record
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct WebhookDelivery {
59    pub id: Uuid,
60    pub webhook_id: Uuid,
61    pub event_id: Uuid,
62    pub status: DeliveryStatus,
63    pub attempt: u32,
64    pub max_attempts: u32,
65    pub response_status: Option<u16>,
66    pub response_body: Option<String>,
67    pub error: Option<String>,
68    pub created_at: DateTime<Utc>,
69    pub next_retry_at: Option<DateTime<Utc>>,
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
73#[serde(rename_all = "lowercase")]
74pub enum DeliveryStatus {
75    Pending,
76    Success,
77    Failed,
78    Retrying,
79}
80
81/// Manages webhook subscriptions in-memory using DashMap
82pub struct WebhookRegistry {
83    /// Webhooks indexed by ID
84    webhooks: DashMap<Uuid, WebhookSubscription>,
85    /// Index: tenant_id -> list of webhook IDs
86    tenant_index: DashMap<String, Vec<Uuid>>,
87    /// Delivery log (webhook_id -> deliveries)
88    deliveries: DashMap<Uuid, Vec<WebhookDelivery>>,
89}
90
91impl Default for WebhookRegistry {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl WebhookRegistry {
98    pub fn new() -> Self {
99        Self {
100            webhooks: DashMap::new(),
101            tenant_index: DashMap::new(),
102            deliveries: DashMap::new(),
103        }
104    }
105
106    /// Register a new webhook subscription
107    pub fn register(&self, req: RegisterWebhookRequest) -> WebhookSubscription {
108        let now = Utc::now();
109        let secret = req.secret.unwrap_or_else(|| {
110            // Generate a random secret if none provided
111            format!("whsec_{}", Uuid::new_v4().to_string().replace('-', ""))
112        });
113
114        let webhook = WebhookSubscription {
115            id: Uuid::new_v4(),
116            tenant_id: req.tenant_id.clone(),
117            url: req.url,
118            event_types: req.event_types,
119            entity_ids: req.entity_ids,
120            secret,
121            active: true,
122            created_at: now,
123            updated_at: now,
124            description: req.description,
125        };
126
127        let id = webhook.id;
128        self.webhooks.insert(id, webhook.clone());
129
130        // Update tenant index
131        self.tenant_index.entry(req.tenant_id).or_default().push(id);
132
133        webhook
134    }
135
136    /// Get a webhook by ID
137    pub fn get(&self, id: Uuid) -> Option<WebhookSubscription> {
138        self.webhooks.get(&id).map(|w| w.clone())
139    }
140
141    /// List webhooks for a tenant
142    pub fn list_by_tenant(&self, tenant_id: &str) -> Vec<WebhookSubscription> {
143        self.tenant_index
144            .get(tenant_id)
145            .map(|ids| {
146                ids.iter()
147                    .filter_map(|id| self.webhooks.get(id).map(|w| w.clone()))
148                    .collect()
149            })
150            .unwrap_or_default()
151    }
152
153    /// Update a webhook
154    pub fn update(&self, id: Uuid, req: UpdateWebhookRequest) -> Option<WebhookSubscription> {
155        let mut webhook = self.webhooks.get_mut(&id)?;
156        let w = webhook.value_mut();
157
158        if let Some(url) = req.url {
159            w.url = url;
160        }
161        if let Some(event_types) = req.event_types {
162            w.event_types = event_types;
163        }
164        if let Some(entity_ids) = req.entity_ids {
165            w.entity_ids = entity_ids;
166        }
167        if let Some(active) = req.active {
168            w.active = active;
169        }
170        if let Some(description) = req.description {
171            w.description = Some(description);
172        }
173        w.updated_at = Utc::now();
174
175        Some(w.clone())
176    }
177
178    /// Delete a webhook
179    pub fn delete(&self, id: Uuid) -> Option<WebhookSubscription> {
180        let (_, webhook) = self.webhooks.remove(&id)?;
181
182        // Remove from tenant index
183        if let Some(mut ids) = self.tenant_index.get_mut(&webhook.tenant_id) {
184            ids.retain(|wid| *wid != id);
185        }
186
187        // Clean up deliveries
188        self.deliveries.remove(&id);
189
190        Some(webhook)
191    }
192
193    /// Find all active webhooks that match a given event
194    pub fn find_matching(&self, event: &Event) -> Vec<WebhookSubscription> {
195        // Get all webhooks (in a real multi-tenant setup, we'd filter by tenant_id
196        // from the event, but Core events currently use "default" tenant)
197        let mut matching = Vec::new();
198
199        for entry in self.webhooks.iter() {
200            let webhook = entry.value();
201            if !webhook.active {
202                continue;
203            }
204
205            // Check event type filter
206            if !webhook.event_types.is_empty()
207                && !webhook
208                    .event_types
209                    .iter()
210                    .any(|pattern| matches_pattern(pattern, event.event_type_str()))
211            {
212                continue;
213            }
214
215            // Check entity ID filter
216            if !webhook.entity_ids.is_empty()
217                && !webhook
218                    .entity_ids
219                    .contains(&event.entity_id_str().to_string())
220            {
221                continue;
222            }
223
224            matching.push(webhook.clone());
225        }
226
227        matching
228    }
229
230    /// Record a delivery attempt
231    pub fn record_delivery(&self, delivery: WebhookDelivery) {
232        self.deliveries
233            .entry(delivery.webhook_id)
234            .or_default()
235            .push(delivery);
236    }
237
238    /// Get delivery history for a webhook
239    pub fn get_deliveries(&self, webhook_id: Uuid, limit: usize) -> Vec<WebhookDelivery> {
240        self.deliveries
241            .get(&webhook_id)
242            .map(|deliveries| {
243                let d = deliveries.value();
244                let start = d.len().saturating_sub(limit);
245                d[start..].to_vec()
246            })
247            .unwrap_or_default()
248    }
249}
250
251/// Simple glob-style pattern matching for event types
252/// Supports:
253/// - exact match: "user.created" matches "user.created"
254/// - wildcard suffix: "user.*" matches "user.created", "user.updated"
255/// - full wildcard: "*" matches everything
256fn matches_pattern(pattern: &str, value: &str) -> bool {
257    if pattern == "*" {
258        return true;
259    }
260    if let Some(prefix) = pattern.strip_suffix(".*") {
261        return value.starts_with(prefix) && value[prefix.len()..].starts_with('.');
262    }
263    pattern == value
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
271        Event::from_strings(
272            event_type.to_string(),
273            entity_id.to_string(),
274            "default".to_string(),
275            serde_json::json!({"test": true}),
276            None,
277        )
278        .unwrap()
279    }
280
281    #[test]
282    fn test_register_webhook() {
283        let registry = WebhookRegistry::new();
284
285        let webhook = registry.register(RegisterWebhookRequest {
286            tenant_id: "tenant-1".to_string(),
287            url: "https://example.com/webhook".to_string(),
288            event_types: vec!["user.*".to_string()],
289            entity_ids: vec![],
290            secret: Some("test-secret".to_string()),
291            description: Some("Test webhook".to_string()),
292        });
293
294        assert_eq!(webhook.url, "https://example.com/webhook");
295        assert_eq!(webhook.tenant_id, "tenant-1");
296        assert!(webhook.active);
297        assert_eq!(webhook.secret, "test-secret");
298    }
299
300    #[test]
301    fn test_get_webhook() {
302        let registry = WebhookRegistry::new();
303
304        let webhook = registry.register(RegisterWebhookRequest {
305            tenant_id: "tenant-1".to_string(),
306            url: "https://example.com/webhook".to_string(),
307            event_types: vec![],
308            entity_ids: vec![],
309            secret: None,
310            description: None,
311        });
312
313        let found = registry.get(webhook.id).unwrap();
314        assert_eq!(found.id, webhook.id);
315        assert_eq!(found.url, webhook.url);
316    }
317
318    #[test]
319    fn test_list_by_tenant() {
320        let registry = WebhookRegistry::new();
321
322        registry.register(RegisterWebhookRequest {
323            tenant_id: "tenant-1".to_string(),
324            url: "https://example.com/hook1".to_string(),
325            event_types: vec![],
326            entity_ids: vec![],
327            secret: None,
328            description: None,
329        });
330
331        registry.register(RegisterWebhookRequest {
332            tenant_id: "tenant-1".to_string(),
333            url: "https://example.com/hook2".to_string(),
334            event_types: vec![],
335            entity_ids: vec![],
336            secret: None,
337            description: None,
338        });
339
340        registry.register(RegisterWebhookRequest {
341            tenant_id: "tenant-2".to_string(),
342            url: "https://other.com/hook".to_string(),
343            event_types: vec![],
344            entity_ids: vec![],
345            secret: None,
346            description: None,
347        });
348
349        let tenant1_hooks = registry.list_by_tenant("tenant-1");
350        assert_eq!(tenant1_hooks.len(), 2);
351
352        let tenant2_hooks = registry.list_by_tenant("tenant-2");
353        assert_eq!(tenant2_hooks.len(), 1);
354    }
355
356    #[test]
357    fn test_update_webhook() {
358        let registry = WebhookRegistry::new();
359
360        let webhook = registry.register(RegisterWebhookRequest {
361            tenant_id: "tenant-1".to_string(),
362            url: "https://example.com/webhook".to_string(),
363            event_types: vec![],
364            entity_ids: vec![],
365            secret: None,
366            description: None,
367        });
368
369        let updated = registry
370            .update(
371                webhook.id,
372                UpdateWebhookRequest {
373                    url: Some("https://example.com/new-url".to_string()),
374                    event_types: Some(vec!["order.*".to_string()]),
375                    entity_ids: None,
376                    active: Some(false),
377                    description: Some("Updated".to_string()),
378                },
379            )
380            .unwrap();
381
382        assert_eq!(updated.url, "https://example.com/new-url");
383        assert_eq!(updated.event_types, vec!["order.*".to_string()]);
384        assert!(!updated.active);
385        assert_eq!(updated.description, Some("Updated".to_string()));
386    }
387
388    #[test]
389    fn test_delete_webhook() {
390        let registry = WebhookRegistry::new();
391
392        let webhook = registry.register(RegisterWebhookRequest {
393            tenant_id: "tenant-1".to_string(),
394            url: "https://example.com/webhook".to_string(),
395            event_types: vec![],
396            entity_ids: vec![],
397            secret: None,
398            description: None,
399        });
400
401        let deleted = registry.delete(webhook.id);
402        assert!(deleted.is_some());
403        assert!(registry.get(webhook.id).is_none());
404        assert!(registry.list_by_tenant("tenant-1").is_empty());
405    }
406
407    #[test]
408    fn test_find_matching_all_events() {
409        let registry = WebhookRegistry::new();
410
411        registry.register(RegisterWebhookRequest {
412            tenant_id: "tenant-1".to_string(),
413            url: "https://example.com/all".to_string(),
414            event_types: vec![],
415            entity_ids: vec![],
416            secret: None,
417            description: None,
418        });
419
420        let event = create_test_event("entity-1", "user.created");
421        let matching = registry.find_matching(&event);
422        assert_eq!(matching.len(), 1);
423    }
424
425    #[test]
426    fn test_find_matching_event_type_wildcard() {
427        let registry = WebhookRegistry::new();
428
429        registry.register(RegisterWebhookRequest {
430            tenant_id: "tenant-1".to_string(),
431            url: "https://example.com/users".to_string(),
432            event_types: vec!["user.*".to_string()],
433            entity_ids: vec![],
434            secret: None,
435            description: None,
436        });
437
438        let user_event = create_test_event("entity-1", "user.created");
439        assert_eq!(registry.find_matching(&user_event).len(), 1);
440
441        let order_event = create_test_event("entity-1", "order.placed");
442        assert_eq!(registry.find_matching(&order_event).len(), 0);
443    }
444
445    #[test]
446    fn test_find_matching_entity_filter() {
447        let registry = WebhookRegistry::new();
448
449        registry.register(RegisterWebhookRequest {
450            tenant_id: "tenant-1".to_string(),
451            url: "https://example.com/entity".to_string(),
452            event_types: vec![],
453            entity_ids: vec!["entity-1".to_string()],
454            secret: None,
455            description: None,
456        });
457
458        let matching_event = create_test_event("entity-1", "user.created");
459        assert_eq!(registry.find_matching(&matching_event).len(), 1);
460
461        let non_matching = create_test_event("entity-2", "user.created");
462        assert_eq!(registry.find_matching(&non_matching).len(), 0);
463    }
464
465    #[test]
466    fn test_find_matching_inactive_skipped() {
467        let registry = WebhookRegistry::new();
468
469        let webhook = registry.register(RegisterWebhookRequest {
470            tenant_id: "tenant-1".to_string(),
471            url: "https://example.com/hook".to_string(),
472            event_types: vec![],
473            entity_ids: vec![],
474            secret: None,
475            description: None,
476        });
477
478        // Deactivate
479        registry.update(
480            webhook.id,
481            UpdateWebhookRequest {
482                url: None,
483                event_types: None,
484                entity_ids: None,
485                active: Some(false),
486                description: None,
487            },
488        );
489
490        let event = create_test_event("entity-1", "user.created");
491        assert_eq!(registry.find_matching(&event).len(), 0);
492    }
493
494    #[test]
495    fn test_matches_pattern() {
496        assert!(matches_pattern("*", "anything"));
497        assert!(matches_pattern("user.created", "user.created"));
498        assert!(!matches_pattern("user.created", "user.updated"));
499        assert!(matches_pattern("user.*", "user.created"));
500        assert!(matches_pattern("user.*", "user.updated"));
501        assert!(!matches_pattern("user.*", "order.placed"));
502        assert!(!matches_pattern("user.*", "user"));
503    }
504
505    #[test]
506    fn test_record_and_get_deliveries() {
507        let registry = WebhookRegistry::new();
508        let webhook_id = Uuid::new_v4();
509
510        registry.record_delivery(WebhookDelivery {
511            id: Uuid::new_v4(),
512            webhook_id,
513            event_id: Uuid::new_v4(),
514            status: DeliveryStatus::Success,
515            attempt: 1,
516            max_attempts: 5,
517            response_status: Some(200),
518            response_body: None,
519            error: None,
520            created_at: Utc::now(),
521            next_retry_at: None,
522        });
523
524        registry.record_delivery(WebhookDelivery {
525            id: Uuid::new_v4(),
526            webhook_id,
527            event_id: Uuid::new_v4(),
528            status: DeliveryStatus::Failed,
529            attempt: 1,
530            max_attempts: 5,
531            response_status: Some(500),
532            response_body: Some("Internal error".to_string()),
533            error: None,
534            created_at: Utc::now(),
535            next_retry_at: None,
536        });
537
538        let deliveries = registry.get_deliveries(webhook_id, 10);
539        assert_eq!(deliveries.len(), 2);
540        assert_eq!(deliveries[0].status, DeliveryStatus::Success);
541        assert_eq!(deliveries[1].status, DeliveryStatus::Failed);
542    }
543
544    #[test]
545    fn test_auto_generated_secret() {
546        let registry = WebhookRegistry::new();
547
548        let webhook = registry.register(RegisterWebhookRequest {
549            tenant_id: "tenant-1".to_string(),
550            url: "https://example.com/hook".to_string(),
551            event_types: vec![],
552            entity_ids: vec![],
553            secret: None,
554            description: None,
555        });
556
557        assert!(webhook.secret.starts_with("whsec_"));
558        assert!(webhook.secret.len() > 10);
559    }
560}