1use async_trait::async_trait;
62use chrono::{DateTime, Utc};
63use ipnet::IpNet;
64use rand::Rng;
65use serde::{Deserialize, Serialize};
66use std::collections::HashMap;
67use std::net::IpAddr;
68use std::time::Duration;
69use thiserror::Error;
70use url::Url;
71
72use crate::TaskId;
73
74#[derive(Debug, Error)]
85pub enum WebhookError {
86 #[error("Webhook request failed: {0}")]
96 RequestFailed(String),
97
98 #[error("Max retries exceeded for webhook")]
109 MaxRetriesExceeded,
110
111 #[error("Webhook serialization error: {0}")]
121 SerializationError(String),
122
123 #[error("Invalid webhook URL: {0}")]
133 InvalidUrl(String),
134
135 #[error("URL scheme not allowed: {0}. Only HTTPS is permitted for webhooks")]
145 SchemeNotAllowed(String),
146
147 #[error("Webhook URL resolves to blocked IP address: {0}")]
157 BlockedIpAddress(String),
158
159 #[error("DNS resolution failed for webhook URL host: {0}")]
169 DnsResolutionFailed(String),
170}
171
172const BLOCKED_IP_RANGES: &[&str] = &[
180 "127.0.0.0/8",
182 "10.0.0.0/8",
184 "172.16.0.0/12",
185 "192.168.0.0/16",
186 "169.254.0.0/16",
188 "::1/128",
190 "fe80::/10",
192 "fc00::/7",
194];
195
196pub fn is_blocked_ip(ip: &IpAddr) -> bool {
219 BLOCKED_IP_RANGES.iter().any(|range| {
220 range
221 .parse::<IpNet>()
222 .map(|net| net.contains(ip))
223 .unwrap_or(false)
224 })
225}
226
227pub fn validate_webhook_url(url_str: &str) -> Result<Url, WebhookError> {
260 let parsed_url =
262 Url::parse(url_str).map_err(|e| WebhookError::InvalidUrl(format!("{}: {}", url_str, e)))?;
263
264 if parsed_url.scheme() != "https" {
266 return Err(WebhookError::SchemeNotAllowed(
267 parsed_url.scheme().to_string(),
268 ));
269 }
270
271 let host = parsed_url
273 .host_str()
274 .ok_or_else(|| WebhookError::InvalidUrl("URL has no host".to_string()))?;
275
276 let host_for_parse = host
280 .strip_prefix('[')
281 .and_then(|s| s.strip_suffix(']'))
282 .unwrap_or(host);
283
284 if let Ok(ip) = host_for_parse.parse::<IpAddr>() {
285 if is_blocked_ip(&ip) {
286 return Err(WebhookError::BlockedIpAddress(ip.to_string()));
287 }
288 return Ok(parsed_url);
289 }
290
291 let host_lower = host.to_lowercase();
294 if host_lower == "localhost" || host_lower.ends_with(".localhost") {
295 return Err(WebhookError::BlockedIpAddress("localhost".to_string()));
296 }
297
298 if host_lower.ends_with(".internal") || host_lower.ends_with(".local") {
300 return Err(WebhookError::BlockedIpAddress(format!(
301 "internal hostname: {}",
302 host
303 )));
304 }
305
306 Ok(parsed_url)
307}
308
309pub async fn validate_resolved_ips(url: &Url) -> Result<(), WebhookError> {
322 let host = url
323 .host_str()
324 .ok_or_else(|| WebhookError::InvalidUrl("URL has no host".to_string()))?;
325
326 let host_for_parse = host
329 .strip_prefix('[')
330 .and_then(|s| s.strip_suffix(']'))
331 .unwrap_or(host);
332
333 if host_for_parse.parse::<IpAddr>().is_ok() {
334 return Ok(());
335 }
336
337 let port = url.port().unwrap_or(443);
338
339 let addrs = tokio::net::lookup_host(format!("{}:{}", host, port))
341 .await
342 .map_err(|e| WebhookError::DnsResolutionFailed(format!("{}: {}", host, e)))?;
343
344 for addr in addrs {
346 if is_blocked_ip(&addr.ip()) {
347 return Err(WebhookError::BlockedIpAddress(format!(
348 "{} resolves to {}",
349 host,
350 addr.ip()
351 )));
352 }
353 }
354
355 Ok(())
356}
357
358#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
369#[serde(rename_all = "lowercase")]
370pub enum TaskStatus {
371 Success,
373 Failed,
375 Cancelled,
377}
378
379#[derive(Debug, Clone, Serialize, Deserialize)]
406pub struct WebhookEvent {
407 pub task_id: TaskId,
409 pub task_name: String,
411 pub status: TaskStatus,
413 pub result: Option<String>,
415 pub error: Option<String>,
417 pub started_at: DateTime<Utc>,
419 pub completed_at: DateTime<Utc>,
421 pub duration_ms: u64,
423}
424
425#[derive(Debug, Clone)]
444pub struct RetryConfig {
445 pub max_retries: u32,
447 pub initial_backoff: Duration,
449 pub max_backoff: Duration,
451 pub backoff_multiplier: f64,
453}
454
455impl Default for RetryConfig {
456 fn default() -> Self {
457 Self {
458 max_retries: 3,
459 initial_backoff: Duration::from_millis(100),
460 max_backoff: Duration::from_secs(30),
461 backoff_multiplier: 2.0,
462 }
463 }
464}
465
466#[derive(Debug, Clone)]
490pub struct WebhookConfig {
491 pub url: String,
493 pub method: String,
495 pub headers: HashMap<String, String>,
497 pub timeout: Duration,
499 pub retry_config: RetryConfig,
501}
502
503impl Default for WebhookConfig {
504 fn default() -> Self {
505 Self {
506 url: String::new(),
507 method: "POST".to_string(),
508 headers: HashMap::new(),
509 timeout: Duration::from_secs(5),
510 retry_config: RetryConfig::default(),
511 }
512 }
513}
514
515#[async_trait]
544pub trait WebhookSender: Send + Sync {
545 async fn send(&self, event: &WebhookEvent) -> Result<(), WebhookError>;
547}
548
549pub struct HttpWebhookSender {
568 client: reqwest::Client,
569 config: WebhookConfig,
570}
571
572impl HttpWebhookSender {
573 pub fn new(config: WebhookConfig) -> Self {
583 let client = reqwest::Client::builder()
584 .timeout(config.timeout)
585 .build()
586 .unwrap_or_else(|_| reqwest::Client::new());
587
588 Self { client, config }
589 }
590
591 pub fn calculate_backoff(&self, retry_count: u32) -> Duration {
612 let retry_config = &self.config.retry_config;
613
614 let backoff_ms = retry_config.initial_backoff.as_millis() as f64
616 * retry_config.backoff_multiplier.powi(retry_count as i32);
617
618 let mut rng = rand::rng();
620 let jitter = rng.random_range(-0.25..=0.25);
621 let backoff_with_jitter = backoff_ms * (1.0 + jitter);
622
623 let capped_backoff = backoff_with_jitter.min(retry_config.max_backoff.as_millis() as f64);
625
626 Duration::from_millis(capped_backoff.max(0.0) as u64)
627 }
628
629 async fn send_with_retry(&self, event: &WebhookEvent) -> Result<(), WebhookError> {
631 let mut retry_count = 0;
632 let max_retries = self.config.retry_config.max_retries;
633
634 loop {
635 match self.send_request(event).await {
636 Ok(_) => return Ok(()),
637 Err(e) => {
638 if retry_count >= max_retries {
639 return Err(WebhookError::MaxRetriesExceeded);
640 }
641
642 let backoff = self.calculate_backoff(retry_count);
643 tracing::warn!(
644 attempt = retry_count + 1,
645 max_attempts = max_retries + 1,
646 error = %e,
647 backoff = ?backoff,
648 "Webhook request failed, retrying"
649 );
650
651 tokio::time::sleep(backoff).await;
653 retry_count += 1;
654 }
655 }
656 }
657 }
658
659 async fn send_request(&self, event: &WebhookEvent) -> Result<(), WebhookError> {
661 let json_body = serde_json::to_string(event)
662 .map_err(|e| WebhookError::SerializationError(e.to_string()))?;
663
664 let mut request = match self.config.method.to_uppercase().as_str() {
665 "POST" => self.client.post(&self.config.url),
666 "PUT" => self.client.put(&self.config.url),
667 "PATCH" => self.client.patch(&self.config.url),
668 _ => self.client.post(&self.config.url),
669 };
670
671 for (key, value) in &self.config.headers {
673 request = request.header(key, value);
674 }
675
676 let response = request
678 .header("Content-Type", "application/json")
679 .body(json_body)
680 .send()
681 .await
682 .map_err(|e| WebhookError::RequestFailed(e.to_string()))?;
683
684 if !response.status().is_success() {
686 return Err(WebhookError::RequestFailed(format!(
687 "HTTP {}: {}",
688 response.status(),
689 response
690 .text()
691 .await
692 .unwrap_or_else(|_| "No response body".to_string())
693 )));
694 }
695
696 Ok(())
697 }
698}
699
700#[async_trait]
701impl WebhookSender for HttpWebhookSender {
702 async fn send(&self, event: &WebhookEvent) -> Result<(), WebhookError> {
703 let validated_url = validate_webhook_url(&self.config.url)?;
705 validate_resolved_ips(&validated_url).await?;
706
707 self.send_with_retry(event).await
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714 use rstest::rstest;
715 use std::time::Duration;
716
717 #[rstest]
718 fn test_task_status_serialization() {
719 let status = TaskStatus::Success;
721
722 let json = serde_json::to_string(&status).unwrap();
724
725 assert_eq!(json, r#""success""#);
727
728 let status: TaskStatus = serde_json::from_str(r#""failed""#).unwrap();
730
731 assert_eq!(status, TaskStatus::Failed);
733 }
734
735 #[rstest]
736 fn test_webhook_event_serialization() {
737 let now = Utc::now();
739 let event = WebhookEvent {
740 task_id: TaskId::new(),
741 task_name: "test_task".to_string(),
742 status: TaskStatus::Success,
743 result: Some("OK".to_string()),
744 error: None,
745 started_at: now,
746 completed_at: now,
747 duration_ms: 1000,
748 };
749
750 let json = serde_json::to_string(&event).unwrap();
752
753 assert!(json.contains("test_task"));
755 assert!(json.contains(r#""status":"success""#));
756
757 let deserialized: WebhookEvent = serde_json::from_str(&json).unwrap();
759
760 assert_eq!(deserialized.task_name, "test_task");
762 assert_eq!(deserialized.status, TaskStatus::Success);
763 }
764
765 #[rstest]
766 fn test_retry_config_default() {
767 let config = RetryConfig::default();
769
770 assert_eq!(config.max_retries, 3);
772 assert_eq!(config.initial_backoff, Duration::from_millis(100));
773 assert_eq!(config.max_backoff, Duration::from_secs(30));
774 assert_eq!(config.backoff_multiplier, 2.0);
775 }
776
777 #[rstest]
778 fn test_webhook_config_default() {
779 let config = WebhookConfig::default();
781
782 assert_eq!(config.url, "");
784 assert_eq!(config.method, "POST");
785 assert_eq!(config.timeout, Duration::from_secs(5));
786 assert!(config.headers.is_empty());
787 }
788
789 #[rstest]
790 fn test_calculate_backoff() {
791 let config = WebhookConfig {
793 url: "https://example.com".to_string(),
794 method: "POST".to_string(),
795 headers: HashMap::new(),
796 timeout: Duration::from_secs(5),
797 retry_config: RetryConfig {
798 max_retries: 3,
799 initial_backoff: Duration::from_millis(100),
800 max_backoff: Duration::from_secs(10),
801 backoff_multiplier: 2.0,
802 },
803 };
804 let sender = HttpWebhookSender::new(config);
805
806 let backoff0 = sender.calculate_backoff(0);
808 let backoff1 = sender.calculate_backoff(1);
809 let backoff2 = sender.calculate_backoff(2);
810
811 assert!(backoff0.as_millis() >= 75 && backoff0.as_millis() <= 125); assert!(backoff1.as_millis() >= 150 && backoff1.as_millis() <= 250); assert!(backoff2.as_millis() >= 300 && backoff2.as_millis() <= 500); let backoff_large = sender.calculate_backoff(100);
818 assert!(backoff_large <= Duration::from_secs(10));
819 }
820
821 #[rstest]
822 fn test_webhook_error_display() {
823 let error = WebhookError::RequestFailed("Connection timeout".to_string());
825 assert_eq!(
826 error.to_string(),
827 "Webhook request failed: Connection timeout"
828 );
829
830 let error = WebhookError::MaxRetriesExceeded;
831 assert_eq!(error.to_string(), "Max retries exceeded for webhook");
832
833 let error = WebhookError::SerializationError("Invalid JSON".to_string());
834 assert_eq!(
835 error.to_string(),
836 "Webhook serialization error: Invalid JSON"
837 );
838 }
839
840 #[rstest]
841 #[tokio::test]
842 async fn test_http_webhook_sender_creation() {
843 let config = WebhookConfig::default();
845 let sender = HttpWebhookSender::new(config);
846
847 assert_eq!(sender.config.method, "POST");
849 }
850
851 #[rstest]
852 #[tokio::test]
853 async fn test_webhook_event_creation() {
854 let now = Utc::now();
856 let started = now - chrono::Duration::seconds(5);
857
858 let event = WebhookEvent {
860 task_id: TaskId::new(),
861 task_name: "test_task".to_string(),
862 status: TaskStatus::Success,
863 result: Some("Task completed successfully".to_string()),
864 error: None,
865 started_at: started,
866 completed_at: now,
867 duration_ms: 5000,
868 };
869
870 assert_eq!(event.task_name, "test_task");
872 assert_eq!(event.status, TaskStatus::Success);
873 assert!(event.result.is_some());
874 assert!(event.error.is_none());
875 assert_eq!(event.duration_ms, 5000);
876 }
877
878 #[rstest]
879 #[tokio::test]
880 async fn test_webhook_failed_event() {
881 let now = Utc::now();
883
884 let event = WebhookEvent {
886 task_id: TaskId::new(),
887 task_name: "failed_task".to_string(),
888 status: TaskStatus::Failed,
889 result: None,
890 error: Some("Database connection failed".to_string()),
891 started_at: now,
892 completed_at: now,
893 duration_ms: 100,
894 };
895
896 assert_eq!(event.status, TaskStatus::Failed);
898 assert!(event.result.is_none());
899 assert!(event.error.is_some());
900 assert_eq!(
901 event.error.unwrap(),
902 "Database connection failed".to_string()
903 );
904 }
905
906 #[rstest]
911 #[tokio::test]
912 async fn test_webhook_send_success() {
913 let mut server = mockito::Server::new_async().await;
915 let mock = server
916 .mock("POST", "/webhook")
917 .with_status(200)
918 .with_header("content-type", "application/json")
919 .with_body(r#"{"status":"ok"}"#)
920 .create_async()
921 .await;
922
923 let config = WebhookConfig {
924 url: format!("{}/webhook", server.url()),
925 method: "POST".to_string(),
926 headers: HashMap::new(),
927 timeout: Duration::from_secs(5),
928 retry_config: RetryConfig {
929 max_retries: 0,
930 initial_backoff: Duration::from_millis(10),
931 max_backoff: Duration::from_secs(1),
932 backoff_multiplier: 2.0,
933 },
934 };
935
936 let sender = HttpWebhookSender::new(config);
937
938 let now = Utc::now();
939 let event = WebhookEvent {
940 task_id: TaskId::new(),
941 task_name: "test_task".to_string(),
942 status: TaskStatus::Success,
943 result: Some("OK".to_string()),
944 error: None,
945 started_at: now,
946 completed_at: now,
947 duration_ms: 100,
948 };
949
950 let result = sender.send_with_retry(&event).await;
952
953 assert!(result.is_ok());
955 mock.assert_async().await;
956 }
957
958 #[rstest]
959 #[tokio::test]
960 async fn test_webhook_send_retry_then_success() {
961 let mut server = mockito::Server::new_async().await;
963
964 let mock1 = server
966 .mock("POST", "/webhook")
967 .with_status(500)
968 .expect(1)
969 .create_async()
970 .await;
971
972 let mock2 = server
973 .mock("POST", "/webhook")
974 .with_status(503)
975 .expect(1)
976 .create_async()
977 .await;
978
979 let mock3 = server
980 .mock("POST", "/webhook")
981 .with_status(200)
982 .expect(1)
983 .create_async()
984 .await;
985
986 let config = WebhookConfig {
987 url: format!("{}/webhook", server.url()),
988 method: "POST".to_string(),
989 headers: HashMap::new(),
990 timeout: Duration::from_secs(5),
991 retry_config: RetryConfig {
992 max_retries: 3,
993 initial_backoff: Duration::from_millis(10),
994 max_backoff: Duration::from_secs(1),
995 backoff_multiplier: 2.0,
996 },
997 };
998
999 let sender = HttpWebhookSender::new(config);
1000
1001 let now = Utc::now();
1002 let event = WebhookEvent {
1003 task_id: TaskId::new(),
1004 task_name: "test_task".to_string(),
1005 status: TaskStatus::Success,
1006 result: Some("OK".to_string()),
1007 error: None,
1008 started_at: now,
1009 completed_at: now,
1010 duration_ms: 100,
1011 };
1012
1013 let result = sender.send_with_retry(&event).await;
1015
1016 assert!(result.is_ok());
1018 mock1.assert_async().await;
1019 mock2.assert_async().await;
1020 mock3.assert_async().await;
1021 }
1022
1023 #[rstest]
1024 #[tokio::test]
1025 async fn test_webhook_send_max_retries_exceeded() {
1026 let mut server = mockito::Server::new_async().await;
1028
1029 let mock = server
1031 .mock("POST", "/webhook")
1032 .with_status(500)
1033 .expect(4) .create_async()
1035 .await;
1036
1037 let config = WebhookConfig {
1038 url: format!("{}/webhook", server.url()),
1039 method: "POST".to_string(),
1040 headers: HashMap::new(),
1041 timeout: Duration::from_secs(5),
1042 retry_config: RetryConfig {
1043 max_retries: 3,
1044 initial_backoff: Duration::from_millis(10),
1045 max_backoff: Duration::from_secs(1),
1046 backoff_multiplier: 2.0,
1047 },
1048 };
1049
1050 let sender = HttpWebhookSender::new(config);
1051
1052 let now = Utc::now();
1053 let event = WebhookEvent {
1054 task_id: TaskId::new(),
1055 task_name: "test_task".to_string(),
1056 status: TaskStatus::Success,
1057 result: Some("OK".to_string()),
1058 error: None,
1059 started_at: now,
1060 completed_at: now,
1061 duration_ms: 100,
1062 };
1063
1064 let result = sender.send_with_retry(&event).await;
1066
1067 assert!(result.is_err());
1069 assert!(matches!(
1070 result.unwrap_err(),
1071 WebhookError::MaxRetriesExceeded
1072 ));
1073 mock.assert_async().await;
1074 }
1075
1076 #[rstest]
1077 #[tokio::test]
1078 async fn test_webhook_custom_headers() {
1079 let mut server = mockito::Server::new_async().await;
1081
1082 let mock = server
1083 .mock("POST", "/webhook")
1084 .match_header("Authorization", "Bearer test-token")
1085 .match_header("X-Custom-Header", "custom-value")
1086 .with_status(200)
1087 .create_async()
1088 .await;
1089
1090 let mut headers = HashMap::new();
1091 headers.insert("Authorization".to_string(), "Bearer test-token".to_string());
1092 headers.insert("X-Custom-Header".to_string(), "custom-value".to_string());
1093
1094 let config = WebhookConfig {
1095 url: format!("{}/webhook", server.url()),
1096 method: "POST".to_string(),
1097 headers,
1098 timeout: Duration::from_secs(5),
1099 retry_config: RetryConfig {
1100 max_retries: 0,
1101 initial_backoff: Duration::from_millis(10),
1102 max_backoff: Duration::from_secs(1),
1103 backoff_multiplier: 2.0,
1104 },
1105 };
1106
1107 let sender = HttpWebhookSender::new(config);
1108
1109 let now = Utc::now();
1110 let event = WebhookEvent {
1111 task_id: TaskId::new(),
1112 task_name: "test_task".to_string(),
1113 status: TaskStatus::Success,
1114 result: Some("OK".to_string()),
1115 error: None,
1116 started_at: now,
1117 completed_at: now,
1118 duration_ms: 100,
1119 };
1120
1121 let result = sender.send_with_retry(&event).await;
1123
1124 assert!(result.is_ok());
1126 mock.assert_async().await;
1127 }
1128
1129 #[rstest]
1130 #[tokio::test]
1131 async fn test_webhook_retry_loop_sleeps_between_retries() {
1132 let mut server = mockito::Server::new_async().await;
1135
1136 let _mock = server
1138 .mock("POST", "/webhook")
1139 .with_status(500)
1140 .expect(3) .create_async()
1142 .await;
1143
1144 let config = WebhookConfig {
1145 url: format!("{}/webhook", server.url()),
1146 method: "POST".to_string(),
1147 headers: HashMap::new(),
1148 timeout: Duration::from_secs(5),
1149 retry_config: RetryConfig {
1150 max_retries: 2,
1151 initial_backoff: Duration::from_millis(50),
1152 max_backoff: Duration::from_secs(1),
1153 backoff_multiplier: 2.0,
1154 },
1155 };
1156
1157 let sender = HttpWebhookSender::new(config);
1158
1159 let now = Utc::now();
1160 let event = WebhookEvent {
1161 task_id: TaskId::new(),
1162 task_name: "test_task".to_string(),
1163 status: TaskStatus::Success,
1164 result: None,
1165 error: None,
1166 started_at: now,
1167 completed_at: now,
1168 duration_ms: 0,
1169 };
1170
1171 let start = std::time::Instant::now();
1173 let result = sender.send_with_retry(&event).await;
1174 let elapsed = start.elapsed();
1175
1176 assert!(result.is_err());
1180 assert!(
1181 elapsed >= Duration::from_millis(80),
1182 "Expected at least 80ms delay from retry backoff sleep, got {:?}",
1183 elapsed
1184 );
1185 }
1186
1187 #[rstest]
1190 #[case("127.0.0.1", true)]
1191 #[case("127.0.0.2", true)]
1192 #[case("127.255.255.255", true)]
1193 #[case("10.0.0.1", true)]
1194 #[case("10.255.255.255", true)]
1195 #[case("172.16.0.1", true)]
1196 #[case("172.31.255.255", true)]
1197 #[case("192.168.0.1", true)]
1198 #[case("192.168.255.255", true)]
1199 #[case("169.254.169.254", true)]
1200 #[case("169.254.170.2", true)]
1201 #[case("::1", true)]
1202 #[case("fe80::1", true)]
1203 #[case("fc00::1", true)]
1204 #[case("8.8.8.8", false)]
1205 #[case("1.1.1.1", false)]
1206 #[case("203.0.113.1", false)]
1207 #[case("2001:db8::1", false)]
1208 fn test_is_blocked_ip(#[case] ip_str: &str, #[case] expected: bool) {
1209 let ip: IpAddr = ip_str.parse().unwrap();
1211
1212 let result = is_blocked_ip(&ip);
1214
1215 assert_eq!(
1217 result, expected,
1218 "IP {} should be blocked={}",
1219 ip_str, expected
1220 );
1221 }
1222
1223 #[rstest]
1224 #[case("https://example.com/webhook", true)]
1225 #[case("https://api.example.com/hooks/123", true)]
1226 #[case("https://hooks.slack.com/services/T00/B00/xxx", true)]
1227 fn test_validate_webhook_url_accepts_valid_urls(#[case] url: &str, #[case] _valid: bool) {
1228 let result = validate_webhook_url(url);
1230
1231 assert!(
1233 result.is_ok(),
1234 "URL {} should be valid: {:?}",
1235 url,
1236 result.err()
1237 );
1238 }
1239
1240 #[rstest]
1241 #[case("http://example.com/webhook", "SchemeNotAllowed")]
1242 #[case("ftp://example.com/file", "SchemeNotAllowed")]
1243 #[case("not-a-url", "InvalidUrl")]
1244 #[case("https://127.0.0.1/webhook", "BlockedIpAddress")]
1245 #[case("https://10.0.0.1/webhook", "BlockedIpAddress")]
1246 #[case("https://172.16.0.1/webhook", "BlockedIpAddress")]
1247 #[case("https://192.168.1.1/webhook", "BlockedIpAddress")]
1248 #[case("https://169.254.169.254/latest/meta-data/", "BlockedIpAddress")]
1249 #[case("https://[::1]/webhook", "BlockedIpAddress")]
1250 #[case("https://[fe80::1]/webhook", "BlockedIpAddress")]
1251 #[case("https://[fc00::1]/webhook", "BlockedIpAddress")]
1252 #[case("https://localhost/webhook", "BlockedIpAddress")]
1253 #[case("https://sub.localhost/webhook", "BlockedIpAddress")]
1254 #[case("https://service.internal/webhook", "BlockedIpAddress")]
1255 #[case("https://printer.local/webhook", "BlockedIpAddress")]
1256 fn test_validate_webhook_url_rejects_unsafe_urls(
1257 #[case] url: &str,
1258 #[case] expected_error: &str,
1259 ) {
1260 let result = validate_webhook_url(url);
1262
1263 assert!(result.is_err(), "URL {} should be rejected", url);
1265 let err = result.unwrap_err();
1266 let err_name = match &err {
1267 WebhookError::InvalidUrl(_) => "InvalidUrl",
1268 WebhookError::SchemeNotAllowed(_) => "SchemeNotAllowed",
1269 WebhookError::BlockedIpAddress(_) => "BlockedIpAddress",
1270 WebhookError::DnsResolutionFailed(_) => "DnsResolutionFailed",
1271 _ => "Other",
1272 };
1273 assert_eq!(
1274 err_name, expected_error,
1275 "URL {} should produce {} error, got: {}",
1276 url, expected_error, err
1277 );
1278 }
1279
1280 #[rstest]
1281 fn test_validate_webhook_url_blocks_cloud_metadata_endpoint() {
1282 let metadata_urls = [
1284 "https://169.254.169.254/latest/meta-data/",
1285 "https://169.254.169.254/computeMetadata/v1/",
1286 "https://169.254.170.2/v2/credentials",
1287 ];
1288
1289 for url in &metadata_urls {
1290 let result = validate_webhook_url(url);
1292
1293 assert!(
1295 result.is_err(),
1296 "Cloud metadata URL {} should be blocked",
1297 url
1298 );
1299 assert!(
1300 matches!(result.unwrap_err(), WebhookError::BlockedIpAddress(_)),
1301 "Cloud metadata URL {} should produce BlockedIpAddress error",
1302 url
1303 );
1304 }
1305 }
1306
1307 #[rstest]
1308 fn test_webhook_error_display_ssrf_variants() {
1309 let error = WebhookError::InvalidUrl("bad-url".to_string());
1311 assert_eq!(error.to_string(), "Invalid webhook URL: bad-url");
1312
1313 let error = WebhookError::SchemeNotAllowed("http".to_string());
1314 assert_eq!(
1315 error.to_string(),
1316 "URL scheme not allowed: http. Only HTTPS is permitted for webhooks"
1317 );
1318
1319 let error = WebhookError::BlockedIpAddress("127.0.0.1".to_string());
1320 assert_eq!(
1321 error.to_string(),
1322 "Webhook URL resolves to blocked IP address: 127.0.0.1"
1323 );
1324
1325 let error = WebhookError::DnsResolutionFailed("bad.host".to_string());
1326 assert_eq!(
1327 error.to_string(),
1328 "DNS resolution failed for webhook URL host: bad.host"
1329 );
1330 }
1331
1332 #[rstest]
1333 #[tokio::test]
1334 async fn test_send_rejects_http_url_via_ssrf_validation() {
1335 let config = WebhookConfig {
1337 url: "http://example.com/webhook".to_string(),
1338 method: "POST".to_string(),
1339 headers: HashMap::new(),
1340 timeout: Duration::from_secs(5),
1341 retry_config: RetryConfig::default(),
1342 };
1343 let sender = HttpWebhookSender::new(config);
1344 let now = Utc::now();
1345 let event = WebhookEvent {
1346 task_id: TaskId::new(),
1347 task_name: "test_task".to_string(),
1348 status: TaskStatus::Success,
1349 result: None,
1350 error: None,
1351 started_at: now,
1352 completed_at: now,
1353 duration_ms: 0,
1354 };
1355
1356 let result = sender.send(&event).await;
1358
1359 assert!(result.is_err());
1361 assert!(matches!(
1362 result.unwrap_err(),
1363 WebhookError::SchemeNotAllowed(_)
1364 ));
1365 }
1366
1367 #[rstest]
1368 #[tokio::test]
1369 async fn test_send_rejects_private_ip_via_ssrf_validation() {
1370 let config = WebhookConfig {
1372 url: "https://192.168.1.1/webhook".to_string(),
1373 method: "POST".to_string(),
1374 headers: HashMap::new(),
1375 timeout: Duration::from_secs(5),
1376 retry_config: RetryConfig::default(),
1377 };
1378 let sender = HttpWebhookSender::new(config);
1379 let now = Utc::now();
1380 let event = WebhookEvent {
1381 task_id: TaskId::new(),
1382 task_name: "test_task".to_string(),
1383 status: TaskStatus::Success,
1384 result: None,
1385 error: None,
1386 started_at: now,
1387 completed_at: now,
1388 duration_ms: 0,
1389 };
1390
1391 let result = sender.send(&event).await;
1393
1394 assert!(result.is_err());
1396 assert!(matches!(
1397 result.unwrap_err(),
1398 WebhookError::BlockedIpAddress(_)
1399 ));
1400 }
1401
1402 #[rstest]
1412 #[case(1, Duration::from_millis(30), Duration::from_millis(50))]
1413 #[case(2, Duration::from_millis(80), Duration::from_millis(50))]
1414 #[case(3, Duration::from_millis(200), Duration::from_millis(50))]
1415 #[tokio::test]
1416 async fn test_webhook_retry_sleep_is_called_between_attempts(
1417 #[case] max_retries: u32,
1418 #[case] min_elapsed: Duration,
1419 #[case] initial_backoff: Duration,
1420 ) {
1421 let mut server = mockito::Server::new_async().await;
1423
1424 let _mock = server
1426 .mock("POST", "/webhook")
1427 .with_status(500)
1428 .expect((max_retries + 1) as usize)
1429 .create_async()
1430 .await;
1431
1432 let config = WebhookConfig {
1433 url: format!("{}/webhook", server.url()),
1434 method: "POST".to_string(),
1435 headers: HashMap::new(),
1436 timeout: Duration::from_secs(5),
1437 retry_config: RetryConfig {
1438 max_retries,
1439 initial_backoff,
1440 max_backoff: Duration::from_secs(1),
1441 backoff_multiplier: 2.0,
1442 },
1443 };
1444
1445 let sender = HttpWebhookSender::new(config);
1446 let now = Utc::now();
1447 let event = WebhookEvent {
1448 task_id: TaskId::new(),
1449 task_name: "regression_742".to_string(),
1450 status: TaskStatus::Success,
1451 result: None,
1452 error: None,
1453 started_at: now,
1454 completed_at: now,
1455 duration_ms: 0,
1456 };
1457
1458 let start = std::time::Instant::now();
1460 let result = sender.send_with_retry(&event).await;
1461 let elapsed = start.elapsed();
1462
1463 assert!(
1467 result.is_err(),
1468 "expected MaxRetriesExceeded after all retries"
1469 );
1470 assert!(
1471 elapsed >= min_elapsed,
1472 "Regression #742: expected sleep between retries (>={:?}), got {:?}",
1473 min_elapsed,
1474 elapsed
1475 );
1476 }
1477}