Skip to main content

oxirs_stream/
webhook.rs

1//! # Webhook Integration Module
2//!
3//! This module provides comprehensive webhook support for external system integration:
4//! - Webhook registration and management
5//! - Event filtering and routing
6//! - Retry mechanisms with exponential backoff
7//! - Security features (HMAC signing, rate limiting)
8//! - Monitoring and diagnostics
9
10use anyhow::{anyhow, Result};
11// MIGRATED: Using scirs2-core instead of direct rand dependency
12use scirs2_core::random::{Random, Rng};
13use serde::{Deserialize, Serialize};
14use serde_json;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
18use tokio::sync::{broadcast, RwLock};
19use tokio::time::interval;
20use tracing::{debug, error, info, warn};
21use uuid::Uuid;
22
23use crate::StreamEvent;
24
25/// Webhook manager for handling HTTP notifications
26pub struct WebhookManager {
27    /// Registered webhooks
28    webhooks: Arc<RwLock<HashMap<String, RegisteredWebhook>>>,
29    /// HTTP client
30    client: reqwest::Client,
31    /// Event queue
32    event_queue: Arc<RwLock<VecDeque<WebhookEvent>>>,
33    /// Configuration
34    config: WebhookConfig,
35    /// Statistics
36    stats: Arc<RwLock<WebhookStats>>,
37    /// Event notifier
38    event_notifier: broadcast::Sender<WebhookNotification>,
39    /// Rate limiter
40    rate_limiter: Arc<RwLock<RateLimiter>>,
41}
42
43/// Registered webhook
44#[derive(Debug, Clone)]
45struct RegisteredWebhook {
46    /// Webhook ID
47    id: String,
48    /// Webhook URL
49    url: String,
50    /// HTTP method
51    method: HttpMethod,
52    /// Custom headers
53    headers: HashMap<String, String>,
54    /// Event filters
55    filters: Vec<EventFilter>,
56    /// Security configuration
57    security: WebhookSecurity,
58    /// Retry configuration
59    retry_config: RetryConfig,
60    /// Rate limit configuration
61    rate_limit: RateLimit,
62    /// Webhook metadata
63    metadata: WebhookMetadata,
64    /// Statistics
65    stats: WebhookStatistics,
66    /// Created timestamp
67    created_at: Instant,
68    /// Last delivery attempt
69    last_delivery: Option<Instant>,
70    /// Status
71    status: WebhookStatus,
72}
73
74/// HTTP methods
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
76pub enum HttpMethod {
77    Get,
78    Post,
79    Put,
80    Patch,
81    Delete,
82}
83
84/// Event filter for webhook delivery
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct EventFilter {
87    /// Filter by event type
88    pub event_types: Option<Vec<String>>,
89    /// Filter by graph
90    pub graph_filter: Option<String>,
91    /// Filter by subject pattern (regex string)
92    pub subject_pattern: Option<String>,
93    /// Filter by predicate
94    pub predicate_filter: Option<String>,
95    /// Custom filter expression
96    pub custom_filter: Option<String>,
97}
98
99/// Webhook security configuration
100#[derive(Debug, Clone)]
101pub struct WebhookSecurity {
102    /// HMAC secret for signing
103    hmac_secret: Option<String>,
104    /// Authentication headers
105    auth_headers: HashMap<String, String>,
106    /// SSL/TLS verification
107    verify_ssl: bool,
108    /// Allowed response codes
109    allowed_response_codes: Vec<u16>,
110}
111
112/// Retry configuration
113#[derive(Debug, Clone)]
114pub struct RetryConfig {
115    /// Maximum retry attempts
116    pub max_attempts: u32,
117    /// Initial delay between retries
118    pub initial_delay: Duration,
119    /// Maximum delay between retries
120    pub max_delay: Duration,
121    /// Exponential backoff multiplier
122    pub backoff_multiplier: f64,
123    /// Enable jitter
124    pub enable_jitter: bool,
125}
126
127impl Default for RetryConfig {
128    fn default() -> Self {
129        Self {
130            max_attempts: 3,
131            initial_delay: Duration::from_millis(100),
132            max_delay: Duration::from_secs(30),
133            backoff_multiplier: 2.0,
134            enable_jitter: true,
135        }
136    }
137}
138
139/// Rate limit configuration
140#[derive(Debug, Clone)]
141pub struct RateLimit {
142    /// Requests per second
143    pub requests_per_second: f64,
144    /// Burst size
145    pub burst_size: u32,
146    /// Time window
147    pub window: Duration,
148}
149
150impl Default for RateLimit {
151    fn default() -> Self {
152        Self {
153            requests_per_second: 10.0,
154            burst_size: 20,
155            window: Duration::from_secs(1),
156        }
157    }
158}
159
160/// Webhook metadata
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct WebhookMetadata {
163    /// Webhook name
164    pub name: Option<String>,
165    /// Description
166    pub description: Option<String>,
167    /// Owner
168    pub owner: Option<String>,
169    /// Tags
170    pub tags: Vec<String>,
171    /// Custom properties
172    pub properties: HashMap<String, String>,
173}
174
175/// Webhook statistics
176#[derive(Debug, Clone, Default)]
177pub struct WebhookStatistics {
178    /// Total delivery attempts
179    pub total_attempts: u64,
180    /// Successful deliveries
181    pub successful_deliveries: u64,
182    /// Failed deliveries
183    pub failed_deliveries: u64,
184    /// Average response time
185    pub avg_response_time: Duration,
186    /// Last success timestamp
187    pub last_success: Option<Instant>,
188    /// Last failure timestamp
189    pub last_failure: Option<Instant>,
190    /// Consecutive failures
191    pub consecutive_failures: u32,
192}
193
194/// Webhook status
195#[derive(Debug, Clone, PartialEq)]
196enum WebhookStatus {
197    /// Webhook is active
198    Active,
199    /// Webhook is paused
200    Paused,
201    /// Webhook is disabled due to failures
202    Disabled { reason: String },
203    /// Webhook is being deleted
204    Deleting,
205}
206
207/// Webhook event to be delivered
208#[derive(Debug, Clone)]
209struct WebhookEvent {
210    /// Event ID
211    id: String,
212    /// Target webhook ID
213    webhook_id: String,
214    /// Event payload
215    payload: WebhookPayload,
216    /// Delivery attempts
217    attempts: u32,
218    /// Created timestamp
219    created_at: Instant,
220    /// Next retry time
221    next_retry: Option<Instant>,
222}
223
224/// Webhook payload
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct WebhookPayload {
227    /// Event ID
228    pub event_id: String,
229    /// Event timestamp
230    pub timestamp: chrono::DateTime<chrono::Utc>,
231    /// Event type
232    pub event_type: String,
233    /// Event data
234    pub data: serde_json::Value,
235    /// Metadata
236    pub metadata: HashMap<String, String>,
237}
238
239/// Webhook manager configuration
240#[derive(Debug, Clone)]
241pub struct WebhookConfig {
242    /// Maximum registered webhooks
243    pub max_webhooks: usize,
244    /// Maximum queue size
245    pub max_queue_size: usize,
246    /// Worker thread count
247    pub worker_threads: usize,
248    /// Delivery timeout
249    pub delivery_timeout: Duration,
250    /// Queue processing interval
251    pub queue_interval: Duration,
252    /// Enable automatic retry
253    pub enable_retry: bool,
254    /// Enable rate limiting
255    pub enable_rate_limiting: bool,
256    /// Enable HMAC signing
257    pub enable_hmac: bool,
258}
259
260impl Default for WebhookConfig {
261    fn default() -> Self {
262        Self {
263            max_webhooks: 1000,
264            max_queue_size: 10000,
265            worker_threads: 4,
266            delivery_timeout: Duration::from_secs(30),
267            queue_interval: Duration::from_millis(100),
268            enable_retry: true,
269            enable_rate_limiting: true,
270            enable_hmac: true,
271        }
272    }
273}
274
275/// Webhook manager statistics
276#[derive(Debug, Clone, Default)]
277pub struct WebhookStats {
278    /// Total registered webhooks
279    pub total_webhooks: usize,
280    /// Active webhooks
281    pub active_webhooks: usize,
282    /// Total events queued
283    pub events_queued: u64,
284    /// Events delivered
285    pub events_delivered: u64,
286    /// Events failed
287    pub events_failed: u64,
288    /// Queue size
289    pub queue_size: usize,
290    /// Average delivery time
291    pub avg_delivery_time: Duration,
292    /// Rate limit hits
293    pub rate_limit_hits: u64,
294}
295
296/// Webhook notification events
297#[derive(Debug, Clone)]
298pub enum WebhookNotification {
299    /// Webhook registered
300    WebhookRegistered { id: String, url: String },
301    /// Webhook delivery succeeded
302    DeliverySucceeded {
303        webhook_id: String,
304        event_id: String,
305        duration: Duration,
306    },
307    /// Webhook delivery failed
308    DeliveryFailed {
309        webhook_id: String,
310        event_id: String,
311        error: String,
312        attempts: u32,
313    },
314    /// Webhook disabled
315    WebhookDisabled { id: String, reason: String },
316    /// Rate limit exceeded
317    RateLimitExceeded { webhook_id: String },
318}
319
320/// Rate limiter for webhook deliveries
321struct RateLimiter {
322    /// Rate limits per webhook
323    limits: HashMap<String, TokenBucket>,
324    /// Global rate limit
325    global_limit: TokenBucket,
326}
327
328/// Token bucket for rate limiting
329#[derive(Debug, Clone)]
330struct TokenBucket {
331    /// Available tokens
332    tokens: f64,
333    /// Token capacity
334    capacity: f64,
335    /// Refill rate (tokens per second)
336    refill_rate: f64,
337    /// Last refill time
338    last_refill: Instant,
339}
340
341/// Webhook registration parameters
342#[allow(clippy::too_many_arguments)]
343pub struct WebhookRegistration {
344    pub url: String,
345    pub method: HttpMethod,
346    pub headers: HashMap<String, String>,
347    pub filters: Vec<EventFilter>,
348    pub security: WebhookSecurity,
349    pub retry_config: RetryConfig,
350    pub rate_limit: RateLimit,
351    pub metadata: WebhookMetadata,
352}
353
354impl WebhookManager {
355    /// Create a new webhook manager
356    pub async fn new(config: WebhookConfig) -> Result<Self> {
357        let client = reqwest::Client::builder()
358            .timeout(config.delivery_timeout)
359            .build()?;
360
361        let (tx, _) = broadcast::channel(1000);
362
363        Ok(Self {
364            webhooks: Arc::new(RwLock::new(HashMap::new())),
365            client,
366            event_queue: Arc::new(RwLock::new(VecDeque::new())),
367            config,
368            stats: Arc::new(RwLock::new(WebhookStats::default())),
369            event_notifier: tx,
370            rate_limiter: Arc::new(RwLock::new(RateLimiter::new())),
371        })
372    }
373
374    /// Start the webhook manager
375    pub async fn start(&self) -> Result<()> {
376        // Start queue processors
377        for i in 0..self.config.worker_threads {
378            self.start_queue_processor(i).await;
379        }
380
381        // Start rate limiter refill
382        self.start_rate_limiter_refill().await;
383
384        info!(
385            "Webhook manager started with {} workers",
386            self.config.worker_threads
387        );
388        Ok(())
389    }
390
391    /// Register a webhook
392    pub async fn register_webhook(&self, registration: WebhookRegistration) -> Result<String> {
393        let WebhookRegistration {
394            url,
395            method,
396            headers,
397            filters,
398            security,
399            retry_config,
400            rate_limit,
401            metadata,
402        } = registration;
403        // Check limits
404        let webhooks = self.webhooks.read().await;
405        if webhooks.len() >= self.config.max_webhooks {
406            return Err(anyhow!("Maximum webhook limit reached"));
407        }
408        drop(webhooks);
409
410        // Validate URL
411        let parsed_url = reqwest::Url::parse(&url).map_err(|_| anyhow!("Invalid webhook URL"))?;
412
413        if !parsed_url.scheme().starts_with("http") {
414            return Err(anyhow!("Webhook URL must use HTTP or HTTPS"));
415        }
416
417        // Generate webhook ID
418        let webhook_id = Uuid::new_v4().to_string();
419
420        // Create registered webhook
421        let webhook = RegisteredWebhook {
422            id: webhook_id.clone(),
423            url: url.clone(),
424            method,
425            headers,
426            filters,
427            security,
428            retry_config,
429            rate_limit: rate_limit.clone(),
430            metadata,
431            stats: WebhookStatistics::default(),
432            created_at: Instant::now(),
433            last_delivery: None,
434            status: WebhookStatus::Active,
435        };
436
437        // Register webhook
438        self.webhooks
439            .write()
440            .await
441            .insert(webhook_id.clone(), webhook);
442
443        // Initialize rate limiter for this webhook
444        self.rate_limiter
445            .write()
446            .await
447            .add_webhook(&webhook_id, rate_limit);
448
449        // Update statistics
450        let mut stats = self.stats.write().await;
451        stats.total_webhooks += 1;
452        stats.active_webhooks = self.webhooks.read().await.len();
453        drop(stats);
454
455        // Notify
456        let _ = self
457            .event_notifier
458            .send(WebhookNotification::WebhookRegistered {
459                id: webhook_id.clone(),
460                url,
461            });
462
463        info!("Registered webhook: {webhook_id}");
464        Ok(webhook_id)
465    }
466
467    /// Unregister a webhook
468    pub async fn unregister_webhook(&self, webhook_id: &str) -> Result<()> {
469        let mut webhooks = self.webhooks.write().await;
470        webhooks
471            .remove(webhook_id)
472            .ok_or_else(|| anyhow!("Webhook not found"))?;
473
474        // Remove from rate limiter
475        self.rate_limiter.write().await.remove_webhook(webhook_id);
476
477        // Update statistics
478        self.stats.write().await.active_webhooks = webhooks.len();
479
480        info!("Unregistered webhook: {webhook_id}");
481        Ok(())
482    }
483
484    /// Send event to webhooks
485    pub async fn send_event(&self, event: StreamEvent) -> Result<()> {
486        let webhooks = self.webhooks.read().await;
487        let mut matching_webhooks = Vec::new();
488
489        // Find matching webhooks
490        for webhook in webhooks.values() {
491            if webhook.status == WebhookStatus::Active
492                && self.matches_filters(&event, &webhook.filters)
493            {
494                matching_webhooks.push(webhook.id.clone());
495            }
496        }
497        drop(webhooks);
498
499        if matching_webhooks.is_empty() {
500            return Ok(());
501        }
502
503        // Create webhook payload
504        let payload = self.create_payload(&event)?;
505
506        // Queue events for delivery
507        let mut queue = self.event_queue.write().await;
508        for webhook_id in matching_webhooks {
509            if queue.len() >= self.config.max_queue_size {
510                warn!("Webhook queue full, dropping event");
511                break;
512            }
513
514            let webhook_event = WebhookEvent {
515                id: Uuid::new_v4().to_string(),
516                webhook_id,
517                payload: payload.clone(),
518                attempts: 0,
519                created_at: Instant::now(),
520                next_retry: None,
521            };
522
523            queue.push_back(webhook_event);
524            self.stats.write().await.events_queued += 1;
525        }
526
527        Ok(())
528    }
529
530    /// Check if event matches webhook filters
531    fn matches_filters(&self, event: &StreamEvent, filters: &[EventFilter]) -> bool {
532        if filters.is_empty() {
533            return true;
534        }
535
536        filters.iter().any(|filter| {
537            // Check event type filter
538            if let Some(event_types) = &filter.event_types {
539                let event_type = match event {
540                    StreamEvent::TripleAdded { .. } => "triple_added",
541                    StreamEvent::TripleRemoved { .. } => "triple_removed",
542                    StreamEvent::GraphCreated { .. } => "graph_created",
543                    StreamEvent::GraphDeleted { .. } => "graph_deleted",
544                    StreamEvent::GraphCleared { .. } => "graph_cleared",
545                    _ => "unknown",
546                };
547
548                if !event_types.contains(&event_type.to_string()) {
549                    return false;
550                }
551            }
552
553            // Check graph filter
554            if let Some(graph_filter) = &filter.graph_filter {
555                let event_graph = match event {
556                    StreamEvent::TripleAdded { graph, .. }
557                    | StreamEvent::TripleRemoved { graph, .. } => graph.as_ref(),
558                    StreamEvent::GraphCreated { graph, .. }
559                    | StreamEvent::GraphDeleted { graph, .. } => Some(graph),
560                    StreamEvent::GraphCleared { graph, .. } => graph.as_ref(),
561                    _ => None,
562                };
563
564                if event_graph != Some(graph_filter) {
565                    return false;
566                }
567            }
568
569            // Check subject pattern (using simple contains for now)
570            if let Some(pattern) = &filter.subject_pattern {
571                let subject = match event {
572                    StreamEvent::TripleAdded { subject, .. }
573                    | StreamEvent::TripleRemoved { subject, .. } => Some(subject),
574                    _ => None,
575                };
576
577                if let Some(subj) = subject {
578                    if !subj.contains(pattern) {
579                        return false;
580                    }
581                } else {
582                    return false;
583                }
584            }
585
586            // Check predicate filter
587            if let Some(pred_filter) = &filter.predicate_filter {
588                let predicate = match event {
589                    StreamEvent::TripleAdded { predicate, .. }
590                    | StreamEvent::TripleRemoved { predicate, .. } => Some(predicate),
591                    _ => None,
592                };
593
594                if predicate != Some(pred_filter) {
595                    return false;
596                }
597            }
598
599            true
600        })
601    }
602
603    /// Create webhook payload from stream event
604    fn create_payload(&self, event: &StreamEvent) -> Result<WebhookPayload> {
605        let (event_type, data) = match event {
606            StreamEvent::TripleAdded {
607                subject,
608                predicate,
609                object,
610                graph,
611                metadata,
612            } => (
613                "triple_added",
614                serde_json::json!({
615                    "subject": subject,
616                    "predicate": predicate,
617                    "object": object,
618                    "graph": graph,
619                    "metadata": metadata
620                }),
621            ),
622            StreamEvent::TripleRemoved {
623                subject,
624                predicate,
625                object,
626                graph,
627                metadata,
628            } => (
629                "triple_removed",
630                serde_json::json!({
631                    "subject": subject,
632                    "predicate": predicate,
633                    "object": object,
634                    "graph": graph,
635                    "metadata": metadata
636                }),
637            ),
638            StreamEvent::GraphCreated { graph, metadata } => (
639                "graph_created",
640                serde_json::json!({
641                    "graph": graph,
642                    "metadata": metadata
643                }),
644            ),
645            StreamEvent::GraphDeleted { graph, metadata } => (
646                "graph_deleted",
647                serde_json::json!({
648                    "graph": graph,
649                    "metadata": metadata
650                }),
651            ),
652            StreamEvent::GraphCleared { graph, metadata } => (
653                "graph_cleared",
654                serde_json::json!({
655                    "graph": graph,
656                    "metadata": metadata
657                }),
658            ),
659            _ => return Err(anyhow!("Unsupported event type for webhook")),
660        };
661
662        Ok(WebhookPayload {
663            event_id: Uuid::new_v4().to_string(),
664            timestamp: chrono::Utc::now(),
665            event_type: event_type.to_string(),
666            data,
667            metadata: HashMap::new(),
668        })
669    }
670
671    /// Start queue processor
672    async fn start_queue_processor(&self, worker_id: usize) {
673        let queue = self.event_queue.clone();
674        let webhooks = self.webhooks.clone();
675        let client = self.client.clone();
676        let config = self.config.clone();
677        let stats = self.stats.clone();
678        let event_notifier = self.event_notifier.clone();
679        let rate_limiter = self.rate_limiter.clone();
680
681        tokio::spawn(async move {
682            let mut interval = interval(config.queue_interval);
683
684            loop {
685                interval.tick().await;
686
687                // Get next event from queue
688                let event = {
689                    let mut queue_guard = queue.write().await;
690                    queue_guard.pop_front()
691                };
692
693                if let Some(mut event) = event {
694                    // Check if it's time to retry
695                    if let Some(next_retry) = event.next_retry {
696                        if Instant::now() < next_retry {
697                            // Put back in queue
698                            queue.write().await.push_back(event);
699                            continue;
700                        }
701                    }
702
703                    // Get webhook details
704                    let webhook = {
705                        let webhooks_guard = webhooks.read().await;
706                        webhooks_guard.get(&event.webhook_id).cloned()
707                    };
708
709                    if let Some(webhook) = webhook {
710                        // Check rate limit
711                        if config.enable_rate_limiting {
712                            let allowed = rate_limiter.write().await.check_rate_limit(&webhook.id);
713                            if !allowed {
714                                // Put back in queue and skip
715                                queue.write().await.push_back(event);
716                                stats.write().await.rate_limit_hits += 1;
717                                let _ =
718                                    event_notifier.send(WebhookNotification::RateLimitExceeded {
719                                        webhook_id: webhook.id.clone(),
720                                    });
721                                continue;
722                            }
723                        }
724
725                        // Attempt delivery
726                        event.attempts += 1;
727                        let start_time = Instant::now();
728
729                        match Self::deliver_webhook(&client, &webhook, &event.payload, &config)
730                            .await
731                        {
732                            Ok(duration) => {
733                                // Success
734                                Self::update_webhook_stats(&webhooks, &webhook.id, true, duration)
735                                    .await;
736                                stats.write().await.events_delivered += 1;
737
738                                let _ =
739                                    event_notifier.send(WebhookNotification::DeliverySucceeded {
740                                        webhook_id: webhook.id.clone(),
741                                        event_id: event.id.clone(),
742                                        duration,
743                                    });
744
745                                debug!(
746                                    "Webhook delivery succeeded: {} -> {}",
747                                    event.id, webhook.id
748                                );
749                            }
750                            Err(e) => {
751                                // Failure
752                                let duration = start_time.elapsed();
753                                Self::update_webhook_stats(&webhooks, &webhook.id, false, duration)
754                                    .await;
755
756                                error!(
757                                    "Webhook delivery failed: {} -> {}: {e}",
758                                    event.id, webhook.id
759                                );
760
761                                // Check if we should retry
762                                if config.enable_retry
763                                    && event.attempts < webhook.retry_config.max_attempts
764                                {
765                                    // Calculate next retry time
766                                    let delay = Self::calculate_retry_delay(
767                                        &webhook.retry_config,
768                                        event.attempts,
769                                    );
770                                    event.next_retry = Some(Instant::now() + delay);
771
772                                    // Put back in queue
773                                    queue.write().await.push_back(event.clone());
774
775                                    debug!("Scheduling retry for {} in {delay:?}", event.id);
776                                } else {
777                                    // Max retries reached
778                                    stats.write().await.events_failed += 1;
779
780                                    let _ =
781                                        event_notifier.send(WebhookNotification::DeliveryFailed {
782                                            webhook_id: webhook.id.clone(),
783                                            event_id: event.id.clone(),
784                                            error: e.to_string(),
785                                            attempts: event.attempts,
786                                        });
787
788                                    // Check if webhook should be disabled
789                                    Self::check_webhook_health(
790                                        &webhooks,
791                                        &webhook.id,
792                                        &event_notifier,
793                                    )
794                                    .await;
795                                }
796                            }
797                        }
798                    }
799                }
800
801                // Update queue size stat
802                stats.write().await.queue_size = queue.read().await.len();
803            }
804        });
805
806        debug!("Started webhook queue processor {worker_id}");
807    }
808
809    /// Deliver webhook
810    async fn deliver_webhook(
811        client: &reqwest::Client,
812        webhook: &RegisteredWebhook,
813        payload: &WebhookPayload,
814        config: &WebhookConfig,
815    ) -> Result<Duration> {
816        let start_time = Instant::now();
817
818        // Prepare request
819        let mut request = match webhook.method {
820            HttpMethod::Get => client.get(&webhook.url),
821            HttpMethod::Post => client.post(&webhook.url),
822            HttpMethod::Put => client.put(&webhook.url),
823            HttpMethod::Patch => client.patch(&webhook.url),
824            HttpMethod::Delete => client.delete(&webhook.url),
825        };
826
827        // Add headers
828        for (key, value) in &webhook.headers {
829            request = request.header(key, value);
830        }
831
832        // Add security headers
833        for (key, value) in &webhook.security.auth_headers {
834            request = request.header(key, value);
835        }
836
837        // Add HMAC signature if enabled
838        if config.enable_hmac {
839            if let Some(secret) = &webhook.security.hmac_secret {
840                let signature = Self::calculate_hmac_signature(payload, secret)?;
841                request = request.header("X-Webhook-Signature", signature);
842            }
843        }
844
845        // Add timestamp
846        let timestamp = SystemTime::now()
847            .duration_since(UNIX_EPOCH)
848            .expect("SystemTime should be after UNIX_EPOCH")
849            .as_secs();
850        request = request.header("X-Webhook-Timestamp", timestamp.to_string());
851
852        // Set JSON body for non-GET requests
853        if webhook.method != HttpMethod::Get {
854            request = request.json(payload);
855        }
856
857        // Send request
858        let response = request
859            .send()
860            .await
861            .map_err(|e| anyhow!("Request failed: {e}"))?;
862
863        let status = response.status();
864
865        // Check if response is acceptable
866        if webhook.security.allowed_response_codes.is_empty() {
867            if !status.is_success() {
868                return Err(anyhow!(
869                    "HTTP {}: {}",
870                    status.as_u16(),
871                    response.text().await.unwrap_or_default()
872                ));
873            }
874        } else if !webhook
875            .security
876            .allowed_response_codes
877            .contains(&status.as_u16())
878        {
879            return Err(anyhow!("Unexpected response code: {}", status.as_u16()));
880        }
881
882        Ok(start_time.elapsed())
883    }
884
885    /// Calculate HMAC signature
886    fn calculate_hmac_signature(payload: &WebhookPayload, secret: &str) -> Result<String> {
887        use hmac::{Hmac, Mac};
888        use sha2::Sha256;
889
890        type HmacSha256 = Hmac<Sha256>;
891
892        let payload_json = serde_json::to_string(payload)?;
893        let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
894            .map_err(|_| anyhow!("Invalid HMAC key"))?;
895        mac.update(payload_json.as_bytes());
896
897        let result = mac.finalize();
898        Ok(format!("sha256={}", hex::encode(result.into_bytes())))
899    }
900
901    /// Calculate retry delay
902    fn calculate_retry_delay(config: &RetryConfig, attempt: u32) -> Duration {
903        let base_delay = config.initial_delay.as_millis() as f64;
904        let multiplier = config.backoff_multiplier.powi(attempt as i32 - 1);
905        let delay_ms = (base_delay * multiplier) as u64;
906
907        let delay = Duration::from_millis(delay_ms.min(config.max_delay.as_millis() as u64));
908
909        if config.enable_jitter {
910            // Add random jitter (±10%)
911            let jitter = (delay.as_millis() as f64
912                * 0.1
913                * ({
914                    let mut random = Random::default();
915                    random.random::<f64>()
916                } - 0.5)) as u64;
917            delay + Duration::from_millis(jitter)
918        } else {
919            delay
920        }
921    }
922
923    /// Update webhook statistics
924    async fn update_webhook_stats(
925        webhooks: &Arc<RwLock<HashMap<String, RegisteredWebhook>>>,
926        webhook_id: &str,
927        success: bool,
928        duration: Duration,
929    ) {
930        let mut webhooks_guard = webhooks.write().await;
931        if let Some(webhook) = webhooks_guard.get_mut(webhook_id) {
932            webhook.stats.total_attempts += 1;
933            webhook.last_delivery = Some(Instant::now());
934
935            if success {
936                webhook.stats.successful_deliveries += 1;
937                webhook.stats.last_success = Some(Instant::now());
938                webhook.stats.consecutive_failures = 0;
939
940                // Update average response time
941                let count = webhook.stats.successful_deliveries;
942                webhook.stats.avg_response_time = Duration::from_millis(
943                    (webhook.stats.avg_response_time.as_millis() as u64 * (count - 1)
944                        + duration.as_millis() as u64)
945                        / count,
946                );
947            } else {
948                webhook.stats.failed_deliveries += 1;
949                webhook.stats.last_failure = Some(Instant::now());
950                webhook.stats.consecutive_failures += 1;
951            }
952        }
953    }
954
955    /// Check webhook health and disable if necessary
956    async fn check_webhook_health(
957        webhooks: &Arc<RwLock<HashMap<String, RegisteredWebhook>>>,
958        webhook_id: &str,
959        event_notifier: &broadcast::Sender<WebhookNotification>,
960    ) {
961        let mut webhooks_guard = webhooks.write().await;
962        if let Some(webhook) = webhooks_guard.get_mut(webhook_id) {
963            // Disable webhook if too many consecutive failures
964            if webhook.stats.consecutive_failures >= 10 {
965                let reason = format!(
966                    "Too many consecutive failures: {}",
967                    webhook.stats.consecutive_failures
968                );
969                webhook.status = WebhookStatus::Disabled {
970                    reason: reason.clone(),
971                };
972
973                let _ = event_notifier.send(WebhookNotification::WebhookDisabled {
974                    id: webhook_id.to_string(),
975                    reason,
976                });
977
978                warn!("Disabled webhook {webhook_id} due to consecutive failures");
979            }
980        }
981    }
982
983    /// Start rate limiter refill
984    async fn start_rate_limiter_refill(&self) {
985        let rate_limiter = self.rate_limiter.clone();
986
987        tokio::spawn(async move {
988            let mut interval = interval(Duration::from_millis(100));
989
990            loop {
991                interval.tick().await;
992                rate_limiter.write().await.refill_tokens();
993            }
994        });
995    }
996
997    /// Get webhook statistics
998    pub async fn get_webhook_stats(&self, webhook_id: &str) -> Result<WebhookStatistics> {
999        let webhooks = self.webhooks.read().await;
1000        let webhook = webhooks
1001            .get(webhook_id)
1002            .ok_or_else(|| anyhow!("Webhook not found"))?;
1003
1004        Ok(webhook.stats.clone())
1005    }
1006
1007    /// Get manager statistics
1008    pub async fn get_stats(&self) -> WebhookStats {
1009        self.stats.read().await.clone()
1010    }
1011
1012    /// List all webhooks
1013    pub async fn list_webhooks(&self) -> Vec<WebhookInfo> {
1014        let webhooks = self.webhooks.read().await;
1015        webhooks
1016            .values()
1017            .map(|w| WebhookInfo {
1018                id: w.id.clone(),
1019                url: w.url.clone(),
1020                method: w.method.clone(),
1021                status: format!("{:?}", w.status),
1022                created_at: w.created_at.elapsed(),
1023                last_delivery: w.last_delivery.map(|t| t.elapsed()),
1024                success_rate: if w.stats.total_attempts > 0 {
1025                    w.stats.successful_deliveries as f64 / w.stats.total_attempts as f64
1026                } else {
1027                    0.0
1028                },
1029            })
1030            .collect()
1031    }
1032
1033    /// Subscribe to webhook notifications
1034    pub fn subscribe(&self) -> broadcast::Receiver<WebhookNotification> {
1035        self.event_notifier.subscribe()
1036    }
1037}
1038
1039/// Webhook information
1040#[derive(Debug, Clone, Serialize, Deserialize)]
1041pub struct WebhookInfo {
1042    pub id: String,
1043    pub url: String,
1044    pub method: HttpMethod,
1045    pub status: String,
1046    pub created_at: Duration,
1047    pub last_delivery: Option<Duration>,
1048    pub success_rate: f64,
1049}
1050
1051impl RateLimiter {
1052    /// Create a new rate limiter
1053    fn new() -> Self {
1054        Self {
1055            limits: HashMap::new(),
1056            global_limit: TokenBucket::new(100.0, 200), // Global limit
1057        }
1058    }
1059
1060    /// Add webhook to rate limiter
1061    fn add_webhook(&mut self, webhook_id: &str, config: RateLimit) {
1062        let bucket = TokenBucket::new(config.requests_per_second, config.burst_size);
1063        self.limits.insert(webhook_id.to_string(), bucket);
1064    }
1065
1066    /// Remove webhook from rate limiter
1067    fn remove_webhook(&mut self, webhook_id: &str) {
1068        self.limits.remove(webhook_id);
1069    }
1070
1071    /// Check rate limit
1072    fn check_rate_limit(&mut self, webhook_id: &str) -> bool {
1073        // Check global limit first
1074        if !self.global_limit.consume(1.0) {
1075            return false;
1076        }
1077
1078        // Check webhook-specific limit
1079        if let Some(bucket) = self.limits.get_mut(webhook_id) {
1080            bucket.consume(1.0)
1081        } else {
1082            true // No limit configured
1083        }
1084    }
1085
1086    /// Refill tokens
1087    fn refill_tokens(&mut self) {
1088        self.global_limit.refill();
1089        for bucket in self.limits.values_mut() {
1090            bucket.refill();
1091        }
1092    }
1093}
1094
1095impl TokenBucket {
1096    /// Create a new token bucket
1097    fn new(refill_rate: f64, capacity: u32) -> Self {
1098        Self {
1099            tokens: capacity as f64,
1100            capacity: capacity as f64,
1101            refill_rate,
1102            last_refill: Instant::now(),
1103        }
1104    }
1105
1106    /// Consume tokens
1107    fn consume(&mut self, amount: f64) -> bool {
1108        self.refill();
1109
1110        if self.tokens >= amount {
1111            self.tokens -= amount;
1112            true
1113        } else {
1114            false
1115        }
1116    }
1117
1118    /// Refill tokens
1119    fn refill(&mut self) {
1120        let now = Instant::now();
1121        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1122
1123        let tokens_to_add = elapsed * self.refill_rate;
1124        self.tokens = (self.tokens + tokens_to_add).min(self.capacity);
1125        self.last_refill = now;
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use super::*;
1132    use crate::event::EventMetadata;
1133    use std::collections::HashMap;
1134
1135    #[tokio::test]
1136    async fn test_webhook_registration() {
1137        let manager = WebhookManager::new(WebhookConfig::default()).await.unwrap();
1138
1139        let webhook_id = manager
1140            .register_webhook(WebhookRegistration {
1141                url: "https://example.com/webhook".to_string(),
1142                method: HttpMethod::Post,
1143                headers: HashMap::new(),
1144                filters: vec![],
1145                security: WebhookSecurity {
1146                    hmac_secret: None,
1147                    auth_headers: HashMap::new(),
1148                    verify_ssl: true,
1149                    allowed_response_codes: vec![],
1150                },
1151                retry_config: RetryConfig::default(),
1152                rate_limit: RateLimit::default(),
1153                metadata: WebhookMetadata {
1154                    name: Some("test_webhook".to_string()),
1155                    description: Some("Test webhook".to_string()),
1156                    owner: None,
1157                    tags: vec![],
1158                    properties: HashMap::new(),
1159                },
1160            })
1161            .await
1162            .unwrap();
1163
1164        assert!(!webhook_id.is_empty());
1165
1166        let webhooks = manager.list_webhooks().await;
1167        assert_eq!(webhooks.len(), 1);
1168        assert_eq!(webhooks[0].id, webhook_id);
1169    }
1170
1171    #[tokio::test]
1172    async fn test_event_filtering() {
1173        let manager = WebhookManager::new(WebhookConfig::default()).await.unwrap();
1174
1175        let filter = EventFilter {
1176            event_types: Some(vec!["triple_added".to_string()]),
1177            graph_filter: None,
1178            subject_pattern: None,
1179            predicate_filter: None,
1180            custom_filter: None,
1181        };
1182
1183        let event_match = StreamEvent::TripleAdded {
1184            subject: "test:subject".to_string(),
1185            predicate: "test:predicate".to_string(),
1186            object: "test:object".to_string(),
1187            graph: None,
1188            metadata: EventMetadata {
1189                event_id: "test".to_string(),
1190                timestamp: chrono::Utc::now(),
1191                source: "test".to_string(),
1192                user: None,
1193                context: None,
1194                caused_by: None,
1195                version: "1.0".to_string(),
1196                properties: HashMap::new(),
1197                checksum: None,
1198            },
1199        };
1200
1201        let event_no_match = StreamEvent::GraphCreated {
1202            graph: "test:graph".to_string(),
1203            metadata: EventMetadata {
1204                event_id: "test".to_string(),
1205                timestamp: chrono::Utc::now(),
1206                source: "test".to_string(),
1207                user: None,
1208                context: None,
1209                caused_by: None,
1210                version: "1.0".to_string(),
1211                properties: HashMap::new(),
1212                checksum: None,
1213            },
1214        };
1215
1216        assert!(manager.matches_filters(&event_match, std::slice::from_ref(&filter)));
1217        assert!(!manager.matches_filters(&event_no_match, std::slice::from_ref(&filter)));
1218    }
1219
1220    #[test]
1221    fn test_token_bucket() {
1222        let mut bucket = TokenBucket::new(10.0, 20);
1223
1224        // Should be able to consume up to capacity
1225        assert!(bucket.consume(20.0));
1226        assert!(!bucket.consume(1.0));
1227
1228        // Wait and refill
1229        std::thread::sleep(Duration::from_millis(100));
1230        bucket.refill();
1231        assert!(bucket.consume(1.0));
1232    }
1233}