1use anyhow::{anyhow, Result};
11use scirs2_core::random::{Random, Rng};
13use serde::{Deserialize, Serialize};
14use serde_json;
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
18use tokio::sync::{broadcast, RwLock};
19use tokio::time::interval;
20use tracing::{debug, error, info, warn};
21use uuid::Uuid;
22
23use crate::StreamEvent;
24
25pub struct WebhookManager {
27 webhooks: Arc<RwLock<HashMap<String, RegisteredWebhook>>>,
29 client: reqwest::Client,
31 event_queue: Arc<RwLock<VecDeque<WebhookEvent>>>,
33 config: WebhookConfig,
35 stats: Arc<RwLock<WebhookStats>>,
37 event_notifier: broadcast::Sender<WebhookNotification>,
39 rate_limiter: Arc<RwLock<RateLimiter>>,
41}
42
43#[derive(Debug, Clone)]
45struct RegisteredWebhook {
46 id: String,
48 url: String,
50 method: HttpMethod,
52 headers: HashMap<String, String>,
54 filters: Vec<EventFilter>,
56 security: WebhookSecurity,
58 retry_config: RetryConfig,
60 rate_limit: RateLimit,
62 metadata: WebhookMetadata,
64 stats: WebhookStatistics,
66 created_at: Instant,
68 last_delivery: Option<Instant>,
70 status: WebhookStatus,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
76pub enum HttpMethod {
77 Get,
78 Post,
79 Put,
80 Patch,
81 Delete,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct EventFilter {
87 pub event_types: Option<Vec<String>>,
89 pub graph_filter: Option<String>,
91 pub subject_pattern: Option<String>,
93 pub predicate_filter: Option<String>,
95 pub custom_filter: Option<String>,
97}
98
99#[derive(Debug, Clone)]
101pub struct WebhookSecurity {
102 hmac_secret: Option<String>,
104 auth_headers: HashMap<String, String>,
106 verify_ssl: bool,
108 allowed_response_codes: Vec<u16>,
110}
111
112#[derive(Debug, Clone)]
114pub struct RetryConfig {
115 pub max_attempts: u32,
117 pub initial_delay: Duration,
119 pub max_delay: Duration,
121 pub backoff_multiplier: f64,
123 pub enable_jitter: bool,
125}
126
127impl Default for RetryConfig {
128 fn default() -> Self {
129 Self {
130 max_attempts: 3,
131 initial_delay: Duration::from_millis(100),
132 max_delay: Duration::from_secs(30),
133 backoff_multiplier: 2.0,
134 enable_jitter: true,
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct RateLimit {
142 pub requests_per_second: f64,
144 pub burst_size: u32,
146 pub window: Duration,
148}
149
150impl Default for RateLimit {
151 fn default() -> Self {
152 Self {
153 requests_per_second: 10.0,
154 burst_size: 20,
155 window: Duration::from_secs(1),
156 }
157 }
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct WebhookMetadata {
163 pub name: Option<String>,
165 pub description: Option<String>,
167 pub owner: Option<String>,
169 pub tags: Vec<String>,
171 pub properties: HashMap<String, String>,
173}
174
175#[derive(Debug, Clone, Default)]
177pub struct WebhookStatistics {
178 pub total_attempts: u64,
180 pub successful_deliveries: u64,
182 pub failed_deliveries: u64,
184 pub avg_response_time: Duration,
186 pub last_success: Option<Instant>,
188 pub last_failure: Option<Instant>,
190 pub consecutive_failures: u32,
192}
193
194#[derive(Debug, Clone, PartialEq)]
196enum WebhookStatus {
197 Active,
199 Paused,
201 Disabled { reason: String },
203 Deleting,
205}
206
207#[derive(Debug, Clone)]
209struct WebhookEvent {
210 id: String,
212 webhook_id: String,
214 payload: WebhookPayload,
216 attempts: u32,
218 created_at: Instant,
220 next_retry: Option<Instant>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct WebhookPayload {
227 pub event_id: String,
229 pub timestamp: chrono::DateTime<chrono::Utc>,
231 pub event_type: String,
233 pub data: serde_json::Value,
235 pub metadata: HashMap<String, String>,
237}
238
239#[derive(Debug, Clone)]
241pub struct WebhookConfig {
242 pub max_webhooks: usize,
244 pub max_queue_size: usize,
246 pub worker_threads: usize,
248 pub delivery_timeout: Duration,
250 pub queue_interval: Duration,
252 pub enable_retry: bool,
254 pub enable_rate_limiting: bool,
256 pub enable_hmac: bool,
258}
259
260impl Default for WebhookConfig {
261 fn default() -> Self {
262 Self {
263 max_webhooks: 1000,
264 max_queue_size: 10000,
265 worker_threads: 4,
266 delivery_timeout: Duration::from_secs(30),
267 queue_interval: Duration::from_millis(100),
268 enable_retry: true,
269 enable_rate_limiting: true,
270 enable_hmac: true,
271 }
272 }
273}
274
275#[derive(Debug, Clone, Default)]
277pub struct WebhookStats {
278 pub total_webhooks: usize,
280 pub active_webhooks: usize,
282 pub events_queued: u64,
284 pub events_delivered: u64,
286 pub events_failed: u64,
288 pub queue_size: usize,
290 pub avg_delivery_time: Duration,
292 pub rate_limit_hits: u64,
294}
295
296#[derive(Debug, Clone)]
298pub enum WebhookNotification {
299 WebhookRegistered { id: String, url: String },
301 DeliverySucceeded {
303 webhook_id: String,
304 event_id: String,
305 duration: Duration,
306 },
307 DeliveryFailed {
309 webhook_id: String,
310 event_id: String,
311 error: String,
312 attempts: u32,
313 },
314 WebhookDisabled { id: String, reason: String },
316 RateLimitExceeded { webhook_id: String },
318}
319
320struct RateLimiter {
322 limits: HashMap<String, TokenBucket>,
324 global_limit: TokenBucket,
326}
327
328#[derive(Debug, Clone)]
330struct TokenBucket {
331 tokens: f64,
333 capacity: f64,
335 refill_rate: f64,
337 last_refill: Instant,
339}
340
341#[allow(clippy::too_many_arguments)]
343pub struct WebhookRegistration {
344 pub url: String,
345 pub method: HttpMethod,
346 pub headers: HashMap<String, String>,
347 pub filters: Vec<EventFilter>,
348 pub security: WebhookSecurity,
349 pub retry_config: RetryConfig,
350 pub rate_limit: RateLimit,
351 pub metadata: WebhookMetadata,
352}
353
354impl WebhookManager {
355 pub async fn new(config: WebhookConfig) -> Result<Self> {
357 let client = reqwest::Client::builder()
358 .timeout(config.delivery_timeout)
359 .build()?;
360
361 let (tx, _) = broadcast::channel(1000);
362
363 Ok(Self {
364 webhooks: Arc::new(RwLock::new(HashMap::new())),
365 client,
366 event_queue: Arc::new(RwLock::new(VecDeque::new())),
367 config,
368 stats: Arc::new(RwLock::new(WebhookStats::default())),
369 event_notifier: tx,
370 rate_limiter: Arc::new(RwLock::new(RateLimiter::new())),
371 })
372 }
373
374 pub async fn start(&self) -> Result<()> {
376 for i in 0..self.config.worker_threads {
378 self.start_queue_processor(i).await;
379 }
380
381 self.start_rate_limiter_refill().await;
383
384 info!(
385 "Webhook manager started with {} workers",
386 self.config.worker_threads
387 );
388 Ok(())
389 }
390
391 pub async fn register_webhook(&self, registration: WebhookRegistration) -> Result<String> {
393 let WebhookRegistration {
394 url,
395 method,
396 headers,
397 filters,
398 security,
399 retry_config,
400 rate_limit,
401 metadata,
402 } = registration;
403 let webhooks = self.webhooks.read().await;
405 if webhooks.len() >= self.config.max_webhooks {
406 return Err(anyhow!("Maximum webhook limit reached"));
407 }
408 drop(webhooks);
409
410 let parsed_url = reqwest::Url::parse(&url).map_err(|_| anyhow!("Invalid webhook URL"))?;
412
413 if !parsed_url.scheme().starts_with("http") {
414 return Err(anyhow!("Webhook URL must use HTTP or HTTPS"));
415 }
416
417 let webhook_id = Uuid::new_v4().to_string();
419
420 let webhook = RegisteredWebhook {
422 id: webhook_id.clone(),
423 url: url.clone(),
424 method,
425 headers,
426 filters,
427 security,
428 retry_config,
429 rate_limit: rate_limit.clone(),
430 metadata,
431 stats: WebhookStatistics::default(),
432 created_at: Instant::now(),
433 last_delivery: None,
434 status: WebhookStatus::Active,
435 };
436
437 self.webhooks
439 .write()
440 .await
441 .insert(webhook_id.clone(), webhook);
442
443 self.rate_limiter
445 .write()
446 .await
447 .add_webhook(&webhook_id, rate_limit);
448
449 let mut stats = self.stats.write().await;
451 stats.total_webhooks += 1;
452 stats.active_webhooks = self.webhooks.read().await.len();
453 drop(stats);
454
455 let _ = self
457 .event_notifier
458 .send(WebhookNotification::WebhookRegistered {
459 id: webhook_id.clone(),
460 url,
461 });
462
463 info!("Registered webhook: {webhook_id}");
464 Ok(webhook_id)
465 }
466
467 pub async fn unregister_webhook(&self, webhook_id: &str) -> Result<()> {
469 let mut webhooks = self.webhooks.write().await;
470 webhooks
471 .remove(webhook_id)
472 .ok_or_else(|| anyhow!("Webhook not found"))?;
473
474 self.rate_limiter.write().await.remove_webhook(webhook_id);
476
477 self.stats.write().await.active_webhooks = webhooks.len();
479
480 info!("Unregistered webhook: {webhook_id}");
481 Ok(())
482 }
483
484 pub async fn send_event(&self, event: StreamEvent) -> Result<()> {
486 let webhooks = self.webhooks.read().await;
487 let mut matching_webhooks = Vec::new();
488
489 for webhook in webhooks.values() {
491 if webhook.status == WebhookStatus::Active
492 && self.matches_filters(&event, &webhook.filters)
493 {
494 matching_webhooks.push(webhook.id.clone());
495 }
496 }
497 drop(webhooks);
498
499 if matching_webhooks.is_empty() {
500 return Ok(());
501 }
502
503 let payload = self.create_payload(&event)?;
505
506 let mut queue = self.event_queue.write().await;
508 for webhook_id in matching_webhooks {
509 if queue.len() >= self.config.max_queue_size {
510 warn!("Webhook queue full, dropping event");
511 break;
512 }
513
514 let webhook_event = WebhookEvent {
515 id: Uuid::new_v4().to_string(),
516 webhook_id,
517 payload: payload.clone(),
518 attempts: 0,
519 created_at: Instant::now(),
520 next_retry: None,
521 };
522
523 queue.push_back(webhook_event);
524 self.stats.write().await.events_queued += 1;
525 }
526
527 Ok(())
528 }
529
530 fn matches_filters(&self, event: &StreamEvent, filters: &[EventFilter]) -> bool {
532 if filters.is_empty() {
533 return true;
534 }
535
536 filters.iter().any(|filter| {
537 if let Some(event_types) = &filter.event_types {
539 let event_type = match event {
540 StreamEvent::TripleAdded { .. } => "triple_added",
541 StreamEvent::TripleRemoved { .. } => "triple_removed",
542 StreamEvent::GraphCreated { .. } => "graph_created",
543 StreamEvent::GraphDeleted { .. } => "graph_deleted",
544 StreamEvent::GraphCleared { .. } => "graph_cleared",
545 _ => "unknown",
546 };
547
548 if !event_types.contains(&event_type.to_string()) {
549 return false;
550 }
551 }
552
553 if let Some(graph_filter) = &filter.graph_filter {
555 let event_graph = match event {
556 StreamEvent::TripleAdded { graph, .. }
557 | StreamEvent::TripleRemoved { graph, .. } => graph.as_ref(),
558 StreamEvent::GraphCreated { graph, .. }
559 | StreamEvent::GraphDeleted { graph, .. } => Some(graph),
560 StreamEvent::GraphCleared { graph, .. } => graph.as_ref(),
561 _ => None,
562 };
563
564 if event_graph != Some(graph_filter) {
565 return false;
566 }
567 }
568
569 if let Some(pattern) = &filter.subject_pattern {
571 let subject = match event {
572 StreamEvent::TripleAdded { subject, .. }
573 | StreamEvent::TripleRemoved { subject, .. } => Some(subject),
574 _ => None,
575 };
576
577 if let Some(subj) = subject {
578 if !subj.contains(pattern) {
579 return false;
580 }
581 } else {
582 return false;
583 }
584 }
585
586 if let Some(pred_filter) = &filter.predicate_filter {
588 let predicate = match event {
589 StreamEvent::TripleAdded { predicate, .. }
590 | StreamEvent::TripleRemoved { predicate, .. } => Some(predicate),
591 _ => None,
592 };
593
594 if predicate != Some(pred_filter) {
595 return false;
596 }
597 }
598
599 true
600 })
601 }
602
603 fn create_payload(&self, event: &StreamEvent) -> Result<WebhookPayload> {
605 let (event_type, data) = match event {
606 StreamEvent::TripleAdded {
607 subject,
608 predicate,
609 object,
610 graph,
611 metadata,
612 } => (
613 "triple_added",
614 serde_json::json!({
615 "subject": subject,
616 "predicate": predicate,
617 "object": object,
618 "graph": graph,
619 "metadata": metadata
620 }),
621 ),
622 StreamEvent::TripleRemoved {
623 subject,
624 predicate,
625 object,
626 graph,
627 metadata,
628 } => (
629 "triple_removed",
630 serde_json::json!({
631 "subject": subject,
632 "predicate": predicate,
633 "object": object,
634 "graph": graph,
635 "metadata": metadata
636 }),
637 ),
638 StreamEvent::GraphCreated { graph, metadata } => (
639 "graph_created",
640 serde_json::json!({
641 "graph": graph,
642 "metadata": metadata
643 }),
644 ),
645 StreamEvent::GraphDeleted { graph, metadata } => (
646 "graph_deleted",
647 serde_json::json!({
648 "graph": graph,
649 "metadata": metadata
650 }),
651 ),
652 StreamEvent::GraphCleared { graph, metadata } => (
653 "graph_cleared",
654 serde_json::json!({
655 "graph": graph,
656 "metadata": metadata
657 }),
658 ),
659 _ => return Err(anyhow!("Unsupported event type for webhook")),
660 };
661
662 Ok(WebhookPayload {
663 event_id: Uuid::new_v4().to_string(),
664 timestamp: chrono::Utc::now(),
665 event_type: event_type.to_string(),
666 data,
667 metadata: HashMap::new(),
668 })
669 }
670
671 async fn start_queue_processor(&self, worker_id: usize) {
673 let queue = self.event_queue.clone();
674 let webhooks = self.webhooks.clone();
675 let client = self.client.clone();
676 let config = self.config.clone();
677 let stats = self.stats.clone();
678 let event_notifier = self.event_notifier.clone();
679 let rate_limiter = self.rate_limiter.clone();
680
681 tokio::spawn(async move {
682 let mut interval = interval(config.queue_interval);
683
684 loop {
685 interval.tick().await;
686
687 let event = {
689 let mut queue_guard = queue.write().await;
690 queue_guard.pop_front()
691 };
692
693 if let Some(mut event) = event {
694 if let Some(next_retry) = event.next_retry {
696 if Instant::now() < next_retry {
697 queue.write().await.push_back(event);
699 continue;
700 }
701 }
702
703 let webhook = {
705 let webhooks_guard = webhooks.read().await;
706 webhooks_guard.get(&event.webhook_id).cloned()
707 };
708
709 if let Some(webhook) = webhook {
710 if config.enable_rate_limiting {
712 let allowed = rate_limiter.write().await.check_rate_limit(&webhook.id);
713 if !allowed {
714 queue.write().await.push_back(event);
716 stats.write().await.rate_limit_hits += 1;
717 let _ =
718 event_notifier.send(WebhookNotification::RateLimitExceeded {
719 webhook_id: webhook.id.clone(),
720 });
721 continue;
722 }
723 }
724
725 event.attempts += 1;
727 let start_time = Instant::now();
728
729 match Self::deliver_webhook(&client, &webhook, &event.payload, &config)
730 .await
731 {
732 Ok(duration) => {
733 Self::update_webhook_stats(&webhooks, &webhook.id, true, duration)
735 .await;
736 stats.write().await.events_delivered += 1;
737
738 let _ =
739 event_notifier.send(WebhookNotification::DeliverySucceeded {
740 webhook_id: webhook.id.clone(),
741 event_id: event.id.clone(),
742 duration,
743 });
744
745 debug!(
746 "Webhook delivery succeeded: {} -> {}",
747 event.id, webhook.id
748 );
749 }
750 Err(e) => {
751 let duration = start_time.elapsed();
753 Self::update_webhook_stats(&webhooks, &webhook.id, false, duration)
754 .await;
755
756 error!(
757 "Webhook delivery failed: {} -> {}: {e}",
758 event.id, webhook.id
759 );
760
761 if config.enable_retry
763 && event.attempts < webhook.retry_config.max_attempts
764 {
765 let delay = Self::calculate_retry_delay(
767 &webhook.retry_config,
768 event.attempts,
769 );
770 event.next_retry = Some(Instant::now() + delay);
771
772 queue.write().await.push_back(event.clone());
774
775 debug!("Scheduling retry for {} in {delay:?}", event.id);
776 } else {
777 stats.write().await.events_failed += 1;
779
780 let _ =
781 event_notifier.send(WebhookNotification::DeliveryFailed {
782 webhook_id: webhook.id.clone(),
783 event_id: event.id.clone(),
784 error: e.to_string(),
785 attempts: event.attempts,
786 });
787
788 Self::check_webhook_health(
790 &webhooks,
791 &webhook.id,
792 &event_notifier,
793 )
794 .await;
795 }
796 }
797 }
798 }
799 }
800
801 stats.write().await.queue_size = queue.read().await.len();
803 }
804 });
805
806 debug!("Started webhook queue processor {worker_id}");
807 }
808
809 async fn deliver_webhook(
811 client: &reqwest::Client,
812 webhook: &RegisteredWebhook,
813 payload: &WebhookPayload,
814 config: &WebhookConfig,
815 ) -> Result<Duration> {
816 let start_time = Instant::now();
817
818 let mut request = match webhook.method {
820 HttpMethod::Get => client.get(&webhook.url),
821 HttpMethod::Post => client.post(&webhook.url),
822 HttpMethod::Put => client.put(&webhook.url),
823 HttpMethod::Patch => client.patch(&webhook.url),
824 HttpMethod::Delete => client.delete(&webhook.url),
825 };
826
827 for (key, value) in &webhook.headers {
829 request = request.header(key, value);
830 }
831
832 for (key, value) in &webhook.security.auth_headers {
834 request = request.header(key, value);
835 }
836
837 if config.enable_hmac {
839 if let Some(secret) = &webhook.security.hmac_secret {
840 let signature = Self::calculate_hmac_signature(payload, secret)?;
841 request = request.header("X-Webhook-Signature", signature);
842 }
843 }
844
845 let timestamp = SystemTime::now()
847 .duration_since(UNIX_EPOCH)
848 .expect("SystemTime should be after UNIX_EPOCH")
849 .as_secs();
850 request = request.header("X-Webhook-Timestamp", timestamp.to_string());
851
852 if webhook.method != HttpMethod::Get {
854 request = request.json(payload);
855 }
856
857 let response = request
859 .send()
860 .await
861 .map_err(|e| anyhow!("Request failed: {e}"))?;
862
863 let status = response.status();
864
865 if webhook.security.allowed_response_codes.is_empty() {
867 if !status.is_success() {
868 return Err(anyhow!(
869 "HTTP {}: {}",
870 status.as_u16(),
871 response.text().await.unwrap_or_default()
872 ));
873 }
874 } else if !webhook
875 .security
876 .allowed_response_codes
877 .contains(&status.as_u16())
878 {
879 return Err(anyhow!("Unexpected response code: {}", status.as_u16()));
880 }
881
882 Ok(start_time.elapsed())
883 }
884
885 fn calculate_hmac_signature(payload: &WebhookPayload, secret: &str) -> Result<String> {
887 use hmac::{Hmac, Mac};
888 use sha2::Sha256;
889
890 type HmacSha256 = Hmac<Sha256>;
891
892 let payload_json = serde_json::to_string(payload)?;
893 let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
894 .map_err(|_| anyhow!("Invalid HMAC key"))?;
895 mac.update(payload_json.as_bytes());
896
897 let result = mac.finalize();
898 Ok(format!("sha256={}", hex::encode(result.into_bytes())))
899 }
900
901 fn calculate_retry_delay(config: &RetryConfig, attempt: u32) -> Duration {
903 let base_delay = config.initial_delay.as_millis() as f64;
904 let multiplier = config.backoff_multiplier.powi(attempt as i32 - 1);
905 let delay_ms = (base_delay * multiplier) as u64;
906
907 let delay = Duration::from_millis(delay_ms.min(config.max_delay.as_millis() as u64));
908
909 if config.enable_jitter {
910 let jitter = (delay.as_millis() as f64
912 * 0.1
913 * ({
914 let mut random = Random::default();
915 random.random::<f64>()
916 } - 0.5)) as u64;
917 delay + Duration::from_millis(jitter)
918 } else {
919 delay
920 }
921 }
922
923 async fn update_webhook_stats(
925 webhooks: &Arc<RwLock<HashMap<String, RegisteredWebhook>>>,
926 webhook_id: &str,
927 success: bool,
928 duration: Duration,
929 ) {
930 let mut webhooks_guard = webhooks.write().await;
931 if let Some(webhook) = webhooks_guard.get_mut(webhook_id) {
932 webhook.stats.total_attempts += 1;
933 webhook.last_delivery = Some(Instant::now());
934
935 if success {
936 webhook.stats.successful_deliveries += 1;
937 webhook.stats.last_success = Some(Instant::now());
938 webhook.stats.consecutive_failures = 0;
939
940 let count = webhook.stats.successful_deliveries;
942 webhook.stats.avg_response_time = Duration::from_millis(
943 (webhook.stats.avg_response_time.as_millis() as u64 * (count - 1)
944 + duration.as_millis() as u64)
945 / count,
946 );
947 } else {
948 webhook.stats.failed_deliveries += 1;
949 webhook.stats.last_failure = Some(Instant::now());
950 webhook.stats.consecutive_failures += 1;
951 }
952 }
953 }
954
955 async fn check_webhook_health(
957 webhooks: &Arc<RwLock<HashMap<String, RegisteredWebhook>>>,
958 webhook_id: &str,
959 event_notifier: &broadcast::Sender<WebhookNotification>,
960 ) {
961 let mut webhooks_guard = webhooks.write().await;
962 if let Some(webhook) = webhooks_guard.get_mut(webhook_id) {
963 if webhook.stats.consecutive_failures >= 10 {
965 let reason = format!(
966 "Too many consecutive failures: {}",
967 webhook.stats.consecutive_failures
968 );
969 webhook.status = WebhookStatus::Disabled {
970 reason: reason.clone(),
971 };
972
973 let _ = event_notifier.send(WebhookNotification::WebhookDisabled {
974 id: webhook_id.to_string(),
975 reason,
976 });
977
978 warn!("Disabled webhook {webhook_id} due to consecutive failures");
979 }
980 }
981 }
982
983 async fn start_rate_limiter_refill(&self) {
985 let rate_limiter = self.rate_limiter.clone();
986
987 tokio::spawn(async move {
988 let mut interval = interval(Duration::from_millis(100));
989
990 loop {
991 interval.tick().await;
992 rate_limiter.write().await.refill_tokens();
993 }
994 });
995 }
996
997 pub async fn get_webhook_stats(&self, webhook_id: &str) -> Result<WebhookStatistics> {
999 let webhooks = self.webhooks.read().await;
1000 let webhook = webhooks
1001 .get(webhook_id)
1002 .ok_or_else(|| anyhow!("Webhook not found"))?;
1003
1004 Ok(webhook.stats.clone())
1005 }
1006
1007 pub async fn get_stats(&self) -> WebhookStats {
1009 self.stats.read().await.clone()
1010 }
1011
1012 pub async fn list_webhooks(&self) -> Vec<WebhookInfo> {
1014 let webhooks = self.webhooks.read().await;
1015 webhooks
1016 .values()
1017 .map(|w| WebhookInfo {
1018 id: w.id.clone(),
1019 url: w.url.clone(),
1020 method: w.method.clone(),
1021 status: format!("{:?}", w.status),
1022 created_at: w.created_at.elapsed(),
1023 last_delivery: w.last_delivery.map(|t| t.elapsed()),
1024 success_rate: if w.stats.total_attempts > 0 {
1025 w.stats.successful_deliveries as f64 / w.stats.total_attempts as f64
1026 } else {
1027 0.0
1028 },
1029 })
1030 .collect()
1031 }
1032
1033 pub fn subscribe(&self) -> broadcast::Receiver<WebhookNotification> {
1035 self.event_notifier.subscribe()
1036 }
1037}
1038
1039#[derive(Debug, Clone, Serialize, Deserialize)]
1041pub struct WebhookInfo {
1042 pub id: String,
1043 pub url: String,
1044 pub method: HttpMethod,
1045 pub status: String,
1046 pub created_at: Duration,
1047 pub last_delivery: Option<Duration>,
1048 pub success_rate: f64,
1049}
1050
1051impl RateLimiter {
1052 fn new() -> Self {
1054 Self {
1055 limits: HashMap::new(),
1056 global_limit: TokenBucket::new(100.0, 200), }
1058 }
1059
1060 fn add_webhook(&mut self, webhook_id: &str, config: RateLimit) {
1062 let bucket = TokenBucket::new(config.requests_per_second, config.burst_size);
1063 self.limits.insert(webhook_id.to_string(), bucket);
1064 }
1065
1066 fn remove_webhook(&mut self, webhook_id: &str) {
1068 self.limits.remove(webhook_id);
1069 }
1070
1071 fn check_rate_limit(&mut self, webhook_id: &str) -> bool {
1073 if !self.global_limit.consume(1.0) {
1075 return false;
1076 }
1077
1078 if let Some(bucket) = self.limits.get_mut(webhook_id) {
1080 bucket.consume(1.0)
1081 } else {
1082 true }
1084 }
1085
1086 fn refill_tokens(&mut self) {
1088 self.global_limit.refill();
1089 for bucket in self.limits.values_mut() {
1090 bucket.refill();
1091 }
1092 }
1093}
1094
1095impl TokenBucket {
1096 fn new(refill_rate: f64, capacity: u32) -> Self {
1098 Self {
1099 tokens: capacity as f64,
1100 capacity: capacity as f64,
1101 refill_rate,
1102 last_refill: Instant::now(),
1103 }
1104 }
1105
1106 fn consume(&mut self, amount: f64) -> bool {
1108 self.refill();
1109
1110 if self.tokens >= amount {
1111 self.tokens -= amount;
1112 true
1113 } else {
1114 false
1115 }
1116 }
1117
1118 fn refill(&mut self) {
1120 let now = Instant::now();
1121 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1122
1123 let tokens_to_add = elapsed * self.refill_rate;
1124 self.tokens = (self.tokens + tokens_to_add).min(self.capacity);
1125 self.last_refill = now;
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use super::*;
1132 use crate::event::EventMetadata;
1133 use std::collections::HashMap;
1134
1135 #[tokio::test]
1136 async fn test_webhook_registration() {
1137 let manager = WebhookManager::new(WebhookConfig::default()).await.unwrap();
1138
1139 let webhook_id = manager
1140 .register_webhook(WebhookRegistration {
1141 url: "https://example.com/webhook".to_string(),
1142 method: HttpMethod::Post,
1143 headers: HashMap::new(),
1144 filters: vec![],
1145 security: WebhookSecurity {
1146 hmac_secret: None,
1147 auth_headers: HashMap::new(),
1148 verify_ssl: true,
1149 allowed_response_codes: vec![],
1150 },
1151 retry_config: RetryConfig::default(),
1152 rate_limit: RateLimit::default(),
1153 metadata: WebhookMetadata {
1154 name: Some("test_webhook".to_string()),
1155 description: Some("Test webhook".to_string()),
1156 owner: None,
1157 tags: vec![],
1158 properties: HashMap::new(),
1159 },
1160 })
1161 .await
1162 .unwrap();
1163
1164 assert!(!webhook_id.is_empty());
1165
1166 let webhooks = manager.list_webhooks().await;
1167 assert_eq!(webhooks.len(), 1);
1168 assert_eq!(webhooks[0].id, webhook_id);
1169 }
1170
1171 #[tokio::test]
1172 async fn test_event_filtering() {
1173 let manager = WebhookManager::new(WebhookConfig::default()).await.unwrap();
1174
1175 let filter = EventFilter {
1176 event_types: Some(vec!["triple_added".to_string()]),
1177 graph_filter: None,
1178 subject_pattern: None,
1179 predicate_filter: None,
1180 custom_filter: None,
1181 };
1182
1183 let event_match = StreamEvent::TripleAdded {
1184 subject: "test:subject".to_string(),
1185 predicate: "test:predicate".to_string(),
1186 object: "test:object".to_string(),
1187 graph: None,
1188 metadata: EventMetadata {
1189 event_id: "test".to_string(),
1190 timestamp: chrono::Utc::now(),
1191 source: "test".to_string(),
1192 user: None,
1193 context: None,
1194 caused_by: None,
1195 version: "1.0".to_string(),
1196 properties: HashMap::new(),
1197 checksum: None,
1198 },
1199 };
1200
1201 let event_no_match = StreamEvent::GraphCreated {
1202 graph: "test:graph".to_string(),
1203 metadata: EventMetadata {
1204 event_id: "test".to_string(),
1205 timestamp: chrono::Utc::now(),
1206 source: "test".to_string(),
1207 user: None,
1208 context: None,
1209 caused_by: None,
1210 version: "1.0".to_string(),
1211 properties: HashMap::new(),
1212 checksum: None,
1213 },
1214 };
1215
1216 assert!(manager.matches_filters(&event_match, std::slice::from_ref(&filter)));
1217 assert!(!manager.matches_filters(&event_no_match, std::slice::from_ref(&filter)));
1218 }
1219
1220 #[test]
1221 fn test_token_bucket() {
1222 let mut bucket = TokenBucket::new(10.0, 20);
1223
1224 assert!(bucket.consume(20.0));
1226 assert!(!bucket.consume(1.0));
1227
1228 std::thread::sleep(Duration::from_millis(100));
1230 bucket.refill();
1231 assert!(bucket.consume(1.0));
1232 }
1233}