Skip to main content

reinhardt_tasks/
webhook.rs

1//! Webhook notifications for task completion events
2//!
3//! This module provides webhook notification support for the Reinhardt tasks system.
4//! Webhooks are HTTP callbacks that are triggered when tasks complete, fail, or are cancelled.
5//!
6//! # Features
7//!
8//! - HTTP webhook sender with configurable retry logic
9//! - Exponential backoff with jitter for failed requests
10//! - Configurable timeout and max retries
11//! - Automatic serialization of task events to JSON
12//!
13//! # Example
14//!
15//! ```rust
16//! use reinhardt_tasks::webhook::{
17//!     WebhookConfig, RetryConfig, HttpWebhookSender, WebhookSender, WebhookEvent, TaskStatus
18//! };
19//! use std::time::Duration;
20//! use std::collections::HashMap;
21//! use chrono::Utc;
22//! use reinhardt_tasks::TaskId;
23//!
24//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
25//! // Configure webhook with retry logic
26//! let retry_config = RetryConfig {
27//!     max_retries: 3,
28//!     initial_backoff: Duration::from_millis(100),
29//!     max_backoff: Duration::from_secs(10),
30//!     backoff_multiplier: 2.0,
31//! };
32//!
33//! let config = WebhookConfig {
34//!     url: "https://example.com/webhook".to_string(),
35//!     method: "POST".to_string(),
36//!     headers: HashMap::new(),
37//!     timeout: Duration::from_secs(5),
38//!     retry_config,
39//! };
40//!
41//! let sender = HttpWebhookSender::new(config);
42//!
43//! // Create and send event
44//! let now = Utc::now();
45//! let event = WebhookEvent {
46//!     task_id: TaskId::new(),
47//!     task_name: "example_task".to_string(),
48//!     status: TaskStatus::Success,
49//!     result: Some("Task completed".to_string()),
50//!     error: None,
51//!     started_at: now - chrono::Duration::seconds(10),
52//!     completed_at: now,
53//!     duration_ms: 10000,
54//! };
55//!
56//! sender.send(&event).await?;
57//! # Ok(())
58//! # }
59//! ```
60
61use async_trait::async_trait;
62use chrono::{DateTime, Utc};
63use ipnet::IpNet;
64use rand::Rng;
65use serde::{Deserialize, Serialize};
66use std::collections::HashMap;
67use std::net::IpAddr;
68use std::time::Duration;
69use thiserror::Error;
70use url::Url;
71
72use crate::TaskId;
73
74/// Webhook-related errors
75///
76/// # Example
77///
78/// ```rust
79/// use reinhardt_tasks::webhook::WebhookError;
80///
81/// let error = WebhookError::RequestFailed("Network timeout".to_string());
82/// assert_eq!(error.to_string(), "Webhook request failed: Network timeout");
83/// ```
84#[derive(Debug, Error)]
85pub enum WebhookError {
86	/// HTTP request failed
87	///
88	/// # Example
89	///
90	/// ```rust
91	/// use reinhardt_tasks::webhook::WebhookError;
92	///
93	/// let error = WebhookError::RequestFailed("Connection refused".to_string());
94	/// ```
95	#[error("Webhook request failed: {0}")]
96	RequestFailed(String),
97
98	/// Max retries exceeded
99	///
100	/// # Example
101	///
102	/// ```rust
103	/// use reinhardt_tasks::webhook::WebhookError;
104	///
105	/// let error = WebhookError::MaxRetriesExceeded;
106	/// assert_eq!(error.to_string(), "Max retries exceeded for webhook");
107	/// ```
108	#[error("Max retries exceeded for webhook")]
109	MaxRetriesExceeded,
110
111	/// Serialization error
112	///
113	/// # Example
114	///
115	/// ```rust
116	/// use reinhardt_tasks::webhook::WebhookError;
117	///
118	/// let error = WebhookError::SerializationError("Invalid JSON".to_string());
119	/// ```
120	#[error("Webhook serialization error: {0}")]
121	SerializationError(String),
122
123	/// Invalid URL format
124	///
125	/// # Example
126	///
127	/// ```rust
128	/// use reinhardt_tasks::webhook::WebhookError;
129	///
130	/// let error = WebhookError::InvalidUrl("not-a-url".to_string());
131	/// ```
132	#[error("Invalid webhook URL: {0}")]
133	InvalidUrl(String),
134
135	/// URL scheme not allowed (only HTTPS is permitted)
136	///
137	/// # Example
138	///
139	/// ```rust
140	/// use reinhardt_tasks::webhook::WebhookError;
141	///
142	/// let error = WebhookError::SchemeNotAllowed("http".to_string());
143	/// ```
144	#[error("URL scheme not allowed: {0}. Only HTTPS is permitted for webhooks")]
145	SchemeNotAllowed(String),
146
147	/// SSRF protection: URL resolves to blocked IP address
148	///
149	/// # Example
150	///
151	/// ```rust
152	/// use reinhardt_tasks::webhook::WebhookError;
153	///
154	/// let error = WebhookError::BlockedIpAddress("127.0.0.1".to_string());
155	/// ```
156	#[error("Webhook URL resolves to blocked IP address: {0}")]
157	BlockedIpAddress(String),
158
159	/// DNS resolution failed
160	///
161	/// # Example
162	///
163	/// ```rust
164	/// use reinhardt_tasks::webhook::WebhookError;
165	///
166	/// let error = WebhookError::DnsResolutionFailed("example.invalid".to_string());
167	/// ```
168	#[error("DNS resolution failed for webhook URL host: {0}")]
169	DnsResolutionFailed(String),
170}
171
172/// SSRF protection: blocked IP address ranges
173///
174/// These ranges are blocked to prevent Server-Side Request Forgery attacks:
175/// - Loopback addresses (127.0.0.0/8, ::1/128)
176/// - Private IPv4 ranges (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16)
177/// - Link-local addresses (169.254.0.0/16, fe80::/10)
178/// - Cloud metadata endpoints (169.254.169.254/32 for AWS/GCP/Azure)
179const BLOCKED_IP_RANGES: &[&str] = &[
180	// IPv4 loopback
181	"127.0.0.0/8",
182	// IPv4 private ranges
183	"10.0.0.0/8",
184	"172.16.0.0/12",
185	"192.168.0.0/16",
186	// IPv4 link-local (includes cloud metadata at 169.254.169.254)
187	"169.254.0.0/16",
188	// IPv6 loopback
189	"::1/128",
190	// IPv6 link-local
191	"fe80::/10",
192	// IPv6 unique local (private)
193	"fc00::/7",
194];
195
196/// Check if an IP address is in a blocked range for SSRF protection.
197///
198/// # Arguments
199///
200/// * `ip` - The IP address to check
201///
202/// # Returns
203///
204/// `true` if the IP is in a blocked range, `false` otherwise
205///
206/// # Example
207///
208/// ```rust
209/// use reinhardt_tasks::webhook::is_blocked_ip;
210/// use std::net::IpAddr;
211///
212/// let loopback: IpAddr = "127.0.0.1".parse().unwrap();
213/// assert!(is_blocked_ip(&loopback));
214///
215/// let public: IpAddr = "8.8.8.8".parse().unwrap();
216/// assert!(!is_blocked_ip(&public));
217/// ```
218pub fn is_blocked_ip(ip: &IpAddr) -> bool {
219	BLOCKED_IP_RANGES.iter().any(|range| {
220		range
221			.parse::<IpNet>()
222			.map(|net| net.contains(ip))
223			.unwrap_or(false)
224	})
225}
226
227/// Validate a webhook URL for SSRF protection.
228///
229/// This function performs the following checks:
230/// 1. URL must be parseable
231/// 2. URL scheme must be HTTPS
232/// 3. URL hostname must resolve to a non-blocked IP address
233///
234/// # Arguments
235///
236/// * `url_str` - The URL string to validate
237///
238/// # Returns
239///
240/// `Ok(Url)` if the URL is valid and safe, `Err(WebhookError)` otherwise
241///
242/// # Example
243///
244/// ```rust
245/// use reinhardt_tasks::webhook::validate_webhook_url;
246///
247/// // Valid public HTTPS URL
248/// let result = validate_webhook_url("https://example.com/webhook");
249/// assert!(result.is_ok());
250///
251/// // Invalid: HTTP scheme
252/// let result = validate_webhook_url("http://example.com/webhook");
253/// assert!(result.is_err());
254///
255/// // Invalid: Private IP
256/// let result = validate_webhook_url("https://192.168.1.1/webhook");
257/// assert!(result.is_err());
258/// ```
259pub fn validate_webhook_url(url_str: &str) -> Result<Url, WebhookError> {
260	// Parse the URL
261	let parsed_url =
262		Url::parse(url_str).map_err(|e| WebhookError::InvalidUrl(format!("{}: {}", url_str, e)))?;
263
264	// Check scheme - only HTTPS is allowed
265	if parsed_url.scheme() != "https" {
266		return Err(WebhookError::SchemeNotAllowed(
267			parsed_url.scheme().to_string(),
268		));
269	}
270
271	// Get the host
272	let host = parsed_url
273		.host_str()
274		.ok_or_else(|| WebhookError::InvalidUrl("URL has no host".to_string()))?;
275
276	// Check if host is an IP address directly
277	// Note: host_str() returns IPv6 addresses with brackets (e.g., "[::1]")
278	// so we need to strip them before parsing
279	let host_for_parse = host
280		.strip_prefix('[')
281		.and_then(|s| s.strip_suffix(']'))
282		.unwrap_or(host);
283
284	if let Ok(ip) = host_for_parse.parse::<IpAddr>() {
285		if is_blocked_ip(&ip) {
286			return Err(WebhookError::BlockedIpAddress(ip.to_string()));
287		}
288		return Ok(parsed_url);
289	}
290
291	// For hostnames, we need to resolve DNS (synchronously check common patterns)
292	// First check for localhost-like patterns
293	let host_lower = host.to_lowercase();
294	if host_lower == "localhost" || host_lower.ends_with(".localhost") {
295		return Err(WebhookError::BlockedIpAddress("localhost".to_string()));
296	}
297
298	// Check for internal hostname patterns (common in cloud environments)
299	if host_lower.ends_with(".internal") || host_lower.ends_with(".local") {
300		return Err(WebhookError::BlockedIpAddress(format!(
301			"internal hostname: {}",
302			host
303		)));
304	}
305
306	Ok(parsed_url)
307}
308
309/// Asynchronously resolve a hostname and validate all resolved IP addresses.
310///
311/// This function performs DNS resolution and checks each resolved IP address
312/// against the blocked ranges.
313///
314/// # Arguments
315///
316/// * `url` - The parsed URL to validate
317///
318/// # Returns
319///
320/// `Ok(())` if all resolved IPs are safe, `Err(WebhookError)` if any IP is blocked
321pub async fn validate_resolved_ips(url: &Url) -> Result<(), WebhookError> {
322	let host = url
323		.host_str()
324		.ok_or_else(|| WebhookError::InvalidUrl("URL has no host".to_string()))?;
325
326	// Skip DNS resolution for IP addresses (already validated)
327	// Note: host_str() returns IPv6 addresses with brackets (e.g., "[::1]")
328	let host_for_parse = host
329		.strip_prefix('[')
330		.and_then(|s| s.strip_suffix(']'))
331		.unwrap_or(host);
332
333	if host_for_parse.parse::<IpAddr>().is_ok() {
334		return Ok(());
335	}
336
337	let port = url.port().unwrap_or(443);
338
339	// Perform DNS resolution
340	let addrs = tokio::net::lookup_host(format!("{}:{}", host, port))
341		.await
342		.map_err(|e| WebhookError::DnsResolutionFailed(format!("{}: {}", host, e)))?;
343
344	// Check each resolved IP address
345	for addr in addrs {
346		if is_blocked_ip(&addr.ip()) {
347			return Err(WebhookError::BlockedIpAddress(format!(
348				"{} resolves to {}",
349				host,
350				addr.ip()
351			)));
352		}
353	}
354
355	Ok(())
356}
357
358/// Task status for webhook events
359///
360/// # Example
361///
362/// ```rust
363/// use reinhardt_tasks::webhook::TaskStatus;
364///
365/// let status = TaskStatus::Success;
366/// assert_eq!(status, TaskStatus::Success);
367/// ```
368#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
369#[serde(rename_all = "lowercase")]
370pub enum TaskStatus {
371	/// Task completed successfully
372	Success,
373	/// Task failed with an error
374	Failed,
375	/// Task was cancelled
376	Cancelled,
377}
378
379/// Webhook event payload
380///
381/// Contains all information about a completed task for webhook notification.
382///
383/// # Example
384///
385/// ```rust
386/// use reinhardt_tasks::webhook::{WebhookEvent, TaskStatus};
387/// use reinhardt_tasks::TaskId;
388/// use chrono::Utc;
389///
390/// let now = Utc::now();
391/// let event = WebhookEvent {
392///     task_id: TaskId::new(),
393///     task_name: "send_email".to_string(),
394///     status: TaskStatus::Success,
395///     result: Some("Email sent".to_string()),
396///     error: None,
397///     started_at: now - chrono::Duration::seconds(5),
398///     completed_at: now,
399///     duration_ms: 5000,
400/// };
401///
402/// assert_eq!(event.status, TaskStatus::Success);
403/// assert_eq!(event.duration_ms, 5000);
404/// ```
405#[derive(Debug, Clone, Serialize, Deserialize)]
406pub struct WebhookEvent {
407	/// Unique task identifier
408	pub task_id: TaskId,
409	/// Task name
410	pub task_name: String,
411	/// Task completion status
412	pub status: TaskStatus,
413	/// Task result (if successful)
414	pub result: Option<String>,
415	/// Error message (if failed)
416	pub error: Option<String>,
417	/// Task start time
418	pub started_at: DateTime<Utc>,
419	/// Task completion time
420	pub completed_at: DateTime<Utc>,
421	/// Task duration in milliseconds
422	pub duration_ms: u64,
423}
424
425/// Retry configuration for webhook requests
426///
427/// # Example
428///
429/// ```rust
430/// use reinhardt_tasks::webhook::RetryConfig;
431/// use std::time::Duration;
432///
433/// let config = RetryConfig {
434///     max_retries: 3,
435///     initial_backoff: Duration::from_millis(100),
436///     max_backoff: Duration::from_secs(10),
437///     backoff_multiplier: 2.0,
438/// };
439///
440/// assert_eq!(config.max_retries, 3);
441/// assert_eq!(config.backoff_multiplier, 2.0);
442/// ```
443#[derive(Debug, Clone)]
444pub struct RetryConfig {
445	/// Maximum number of retry attempts
446	pub max_retries: u32,
447	/// Initial backoff duration
448	pub initial_backoff: Duration,
449	/// Maximum backoff duration
450	pub max_backoff: Duration,
451	/// Backoff multiplier for exponential backoff
452	pub backoff_multiplier: f64,
453}
454
455impl Default for RetryConfig {
456	fn default() -> Self {
457		Self {
458			max_retries: 3,
459			initial_backoff: Duration::from_millis(100),
460			max_backoff: Duration::from_secs(30),
461			backoff_multiplier: 2.0,
462		}
463	}
464}
465
466/// Webhook configuration
467///
468/// # Example
469///
470/// ```rust
471/// use reinhardt_tasks::webhook::{WebhookConfig, RetryConfig};
472/// use std::time::Duration;
473/// use std::collections::HashMap;
474///
475/// let mut headers = HashMap::new();
476/// headers.insert("Authorization".to_string(), "Bearer token123".to_string());
477///
478/// let config = WebhookConfig {
479///     url: "https://api.example.com/webhooks".to_string(),
480///     method: "POST".to_string(),
481///     headers,
482///     timeout: Duration::from_secs(5),
483///     retry_config: RetryConfig::default(),
484/// };
485///
486/// assert_eq!(config.url, "https://api.example.com/webhooks");
487/// assert_eq!(config.timeout, Duration::from_secs(5));
488/// ```
489#[derive(Debug, Clone)]
490pub struct WebhookConfig {
491	/// Webhook URL
492	pub url: String,
493	/// HTTP method (e.g., "POST", "PUT")
494	pub method: String,
495	/// Additional HTTP headers
496	pub headers: HashMap<String, String>,
497	/// Request timeout
498	pub timeout: Duration,
499	/// Retry configuration
500	pub retry_config: RetryConfig,
501}
502
503impl Default for WebhookConfig {
504	fn default() -> Self {
505		Self {
506			url: String::new(),
507			method: "POST".to_string(),
508			headers: HashMap::new(),
509			timeout: Duration::from_secs(5),
510			retry_config: RetryConfig::default(),
511		}
512	}
513}
514
515/// Trait for webhook senders
516///
517/// # Example
518///
519/// ```rust,no_run
520/// use reinhardt_tasks::webhook::{WebhookSender, WebhookEvent, TaskStatus, HttpWebhookSender, WebhookConfig};
521/// use reinhardt_tasks::TaskId;
522/// use chrono::Utc;
523///
524/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
525/// let sender = HttpWebhookSender::new(WebhookConfig::default());
526///
527/// let now = Utc::now();
528/// let event = WebhookEvent {
529///     task_id: TaskId::new(),
530///     task_name: "test_task".to_string(),
531///     status: TaskStatus::Success,
532///     result: Some("OK".to_string()),
533///     error: None,
534///     started_at: now,
535///     completed_at: now,
536///     duration_ms: 0,
537/// };
538///
539/// sender.send(&event).await?;
540/// # Ok(())
541/// # }
542/// ```
543#[async_trait]
544pub trait WebhookSender: Send + Sync {
545	/// Send a webhook event
546	async fn send(&self, event: &WebhookEvent) -> Result<(), WebhookError>;
547}
548
549/// HTTP webhook sender with retry logic
550///
551/// # Example
552///
553/// ```rust
554/// use reinhardt_tasks::webhook::{HttpWebhookSender, WebhookConfig, RetryConfig};
555/// use std::time::Duration;
556///
557/// let config = WebhookConfig {
558///     url: "https://example.com/webhook".to_string(),
559///     method: "POST".to_string(),
560///     headers: Default::default(),
561///     timeout: Duration::from_secs(5),
562///     retry_config: RetryConfig::default(),
563/// };
564///
565/// let sender = HttpWebhookSender::new(config);
566/// ```
567pub struct HttpWebhookSender {
568	client: reqwest::Client,
569	config: WebhookConfig,
570}
571
572impl HttpWebhookSender {
573	/// Create a new HTTP webhook sender
574	///
575	/// # Example
576	///
577	/// ```rust
578	/// use reinhardt_tasks::webhook::{HttpWebhookSender, WebhookConfig};
579	///
580	/// let sender = HttpWebhookSender::new(WebhookConfig::default());
581	/// ```
582	pub fn new(config: WebhookConfig) -> Self {
583		let client = reqwest::Client::builder()
584			.timeout(config.timeout)
585			.build()
586			.unwrap_or_else(|_| reqwest::Client::new());
587
588		Self { client, config }
589	}
590
591	/// Calculate backoff duration with exponential backoff and jitter
592	///
593	/// # Example
594	///
595	/// ```rust
596	/// use reinhardt_tasks::webhook::{HttpWebhookSender, WebhookConfig, RetryConfig};
597	/// use std::time::Duration;
598	///
599	/// let config = WebhookConfig {
600	///     url: "https://example.com".to_string(),
601	///     method: "POST".to_string(),
602	///     headers: Default::default(),
603	///     timeout: Duration::from_secs(5),
604	///     retry_config: RetryConfig::default(),
605	/// };
606	///
607	/// let sender = HttpWebhookSender::new(config);
608	/// let backoff = sender.calculate_backoff(2);
609	/// assert!(backoff > Duration::from_millis(0));
610	/// ```
611	pub fn calculate_backoff(&self, retry_count: u32) -> Duration {
612		let retry_config = &self.config.retry_config;
613
614		// Calculate base backoff with exponential growth
615		let backoff_ms = retry_config.initial_backoff.as_millis() as f64
616			* retry_config.backoff_multiplier.powi(retry_count as i32);
617
618		// Add jitter (±25%)
619		let mut rng = rand::rng();
620		let jitter = rng.random_range(-0.25..=0.25);
621		let backoff_with_jitter = backoff_ms * (1.0 + jitter);
622
623		// Cap at max backoff (AFTER jitter)
624		let capped_backoff = backoff_with_jitter.min(retry_config.max_backoff.as_millis() as f64);
625
626		Duration::from_millis(capped_backoff.max(0.0) as u64)
627	}
628
629	/// Send webhook request with retry logic
630	async fn send_with_retry(&self, event: &WebhookEvent) -> Result<(), WebhookError> {
631		let mut retry_count = 0;
632		let max_retries = self.config.retry_config.max_retries;
633
634		loop {
635			match self.send_request(event).await {
636				Ok(_) => return Ok(()),
637				Err(e) => {
638					if retry_count >= max_retries {
639						return Err(WebhookError::MaxRetriesExceeded);
640					}
641
642					let backoff = self.calculate_backoff(retry_count);
643					tracing::warn!(
644						attempt = retry_count + 1,
645						max_attempts = max_retries + 1,
646						error = %e,
647						backoff = ?backoff,
648						"Webhook request failed, retrying"
649					);
650
651					// Wait before retrying to avoid tight retry loops
652					tokio::time::sleep(backoff).await;
653					retry_count += 1;
654				}
655			}
656		}
657	}
658
659	/// Send a single webhook request
660	async fn send_request(&self, event: &WebhookEvent) -> Result<(), WebhookError> {
661		let json_body = serde_json::to_string(event)
662			.map_err(|e| WebhookError::SerializationError(e.to_string()))?;
663
664		let mut request = match self.config.method.to_uppercase().as_str() {
665			"POST" => self.client.post(&self.config.url),
666			"PUT" => self.client.put(&self.config.url),
667			"PATCH" => self.client.patch(&self.config.url),
668			_ => self.client.post(&self.config.url),
669		};
670
671		// Add headers
672		for (key, value) in &self.config.headers {
673			request = request.header(key, value);
674		}
675
676		// Send request
677		let response = request
678			.header("Content-Type", "application/json")
679			.body(json_body)
680			.send()
681			.await
682			.map_err(|e| WebhookError::RequestFailed(e.to_string()))?;
683
684		// Check response status
685		if !response.status().is_success() {
686			return Err(WebhookError::RequestFailed(format!(
687				"HTTP {}: {}",
688				response.status(),
689				response
690					.text()
691					.await
692					.unwrap_or_else(|_| "No response body".to_string())
693			)));
694		}
695
696		Ok(())
697	}
698}
699
700#[async_trait]
701impl WebhookSender for HttpWebhookSender {
702	async fn send(&self, event: &WebhookEvent) -> Result<(), WebhookError> {
703		// Validate URL for SSRF protection before making any requests
704		let validated_url = validate_webhook_url(&self.config.url)?;
705		validate_resolved_ips(&validated_url).await?;
706
707		self.send_with_retry(event).await
708	}
709}
710
711#[cfg(test)]
712mod tests {
713	use super::*;
714	use rstest::rstest;
715	use std::time::Duration;
716
717	#[rstest]
718	fn test_task_status_serialization() {
719		// Arrange
720		let status = TaskStatus::Success;
721
722		// Act
723		let json = serde_json::to_string(&status).unwrap();
724
725		// Assert
726		assert_eq!(json, r#""success""#);
727
728		// Act
729		let status: TaskStatus = serde_json::from_str(r#""failed""#).unwrap();
730
731		// Assert
732		assert_eq!(status, TaskStatus::Failed);
733	}
734
735	#[rstest]
736	fn test_webhook_event_serialization() {
737		// Arrange
738		let now = Utc::now();
739		let event = WebhookEvent {
740			task_id: TaskId::new(),
741			task_name: "test_task".to_string(),
742			status: TaskStatus::Success,
743			result: Some("OK".to_string()),
744			error: None,
745			started_at: now,
746			completed_at: now,
747			duration_ms: 1000,
748		};
749
750		// Act
751		let json = serde_json::to_string(&event).unwrap();
752
753		// Assert
754		assert!(json.contains("test_task"));
755		assert!(json.contains(r#""status":"success""#));
756
757		// Act
758		let deserialized: WebhookEvent = serde_json::from_str(&json).unwrap();
759
760		// Assert
761		assert_eq!(deserialized.task_name, "test_task");
762		assert_eq!(deserialized.status, TaskStatus::Success);
763	}
764
765	#[rstest]
766	fn test_retry_config_default() {
767		// Arrange & Act
768		let config = RetryConfig::default();
769
770		// Assert
771		assert_eq!(config.max_retries, 3);
772		assert_eq!(config.initial_backoff, Duration::from_millis(100));
773		assert_eq!(config.max_backoff, Duration::from_secs(30));
774		assert_eq!(config.backoff_multiplier, 2.0);
775	}
776
777	#[rstest]
778	fn test_webhook_config_default() {
779		// Arrange & Act
780		let config = WebhookConfig::default();
781
782		// Assert
783		assert_eq!(config.url, "");
784		assert_eq!(config.method, "POST");
785		assert_eq!(config.timeout, Duration::from_secs(5));
786		assert!(config.headers.is_empty());
787	}
788
789	#[rstest]
790	fn test_calculate_backoff() {
791		// Arrange
792		let config = WebhookConfig {
793			url: "https://example.com".to_string(),
794			method: "POST".to_string(),
795			headers: HashMap::new(),
796			timeout: Duration::from_secs(5),
797			retry_config: RetryConfig {
798				max_retries: 3,
799				initial_backoff: Duration::from_millis(100),
800				max_backoff: Duration::from_secs(10),
801				backoff_multiplier: 2.0,
802			},
803		};
804		let sender = HttpWebhookSender::new(config);
805
806		// Act - test exponential backoff
807		let backoff0 = sender.calculate_backoff(0);
808		let backoff1 = sender.calculate_backoff(1);
809		let backoff2 = sender.calculate_backoff(2);
810
811		// Assert - verify exponential growth (accounting for jitter)
812		assert!(backoff0.as_millis() >= 75 && backoff0.as_millis() <= 125); // ~100ms +/-25%
813		assert!(backoff1.as_millis() >= 150 && backoff1.as_millis() <= 250); // ~200ms +/-25%
814		assert!(backoff2.as_millis() >= 300 && backoff2.as_millis() <= 500); // ~400ms +/-25%
815
816		// Act & Assert - test max backoff cap
817		let backoff_large = sender.calculate_backoff(100);
818		assert!(backoff_large <= Duration::from_secs(10));
819	}
820
821	#[rstest]
822	fn test_webhook_error_display() {
823		// Arrange & Act & Assert
824		let error = WebhookError::RequestFailed("Connection timeout".to_string());
825		assert_eq!(
826			error.to_string(),
827			"Webhook request failed: Connection timeout"
828		);
829
830		let error = WebhookError::MaxRetriesExceeded;
831		assert_eq!(error.to_string(), "Max retries exceeded for webhook");
832
833		let error = WebhookError::SerializationError("Invalid JSON".to_string());
834		assert_eq!(
835			error.to_string(),
836			"Webhook serialization error: Invalid JSON"
837		);
838	}
839
840	#[rstest]
841	#[tokio::test]
842	async fn test_http_webhook_sender_creation() {
843		// Arrange & Act
844		let config = WebhookConfig::default();
845		let sender = HttpWebhookSender::new(config);
846
847		// Assert
848		assert_eq!(sender.config.method, "POST");
849	}
850
851	#[rstest]
852	#[tokio::test]
853	async fn test_webhook_event_creation() {
854		// Arrange
855		let now = Utc::now();
856		let started = now - chrono::Duration::seconds(5);
857
858		// Act
859		let event = WebhookEvent {
860			task_id: TaskId::new(),
861			task_name: "test_task".to_string(),
862			status: TaskStatus::Success,
863			result: Some("Task completed successfully".to_string()),
864			error: None,
865			started_at: started,
866			completed_at: now,
867			duration_ms: 5000,
868		};
869
870		// Assert
871		assert_eq!(event.task_name, "test_task");
872		assert_eq!(event.status, TaskStatus::Success);
873		assert!(event.result.is_some());
874		assert!(event.error.is_none());
875		assert_eq!(event.duration_ms, 5000);
876	}
877
878	#[rstest]
879	#[tokio::test]
880	async fn test_webhook_failed_event() {
881		// Arrange
882		let now = Utc::now();
883
884		// Act
885		let event = WebhookEvent {
886			task_id: TaskId::new(),
887			task_name: "failed_task".to_string(),
888			status: TaskStatus::Failed,
889			result: None,
890			error: Some("Database connection failed".to_string()),
891			started_at: now,
892			completed_at: now,
893			duration_ms: 100,
894		};
895
896		// Assert
897		assert_eq!(event.status, TaskStatus::Failed);
898		assert!(event.result.is_none());
899		assert!(event.error.is_some());
900		assert_eq!(
901			event.error.unwrap(),
902			"Database connection failed".to_string()
903		);
904	}
905
906	// Integration test with mock HTTP server.
907	// NOTE: These tests use send_with_retry directly because mockito servers
908	// use HTTP on localhost, which is intentionally blocked by SSRF validation.
909	// SSRF validation is tested separately below.
910	#[rstest]
911	#[tokio::test]
912	async fn test_webhook_send_success() {
913		// Arrange
914		let mut server = mockito::Server::new_async().await;
915		let mock = server
916			.mock("POST", "/webhook")
917			.with_status(200)
918			.with_header("content-type", "application/json")
919			.with_body(r#"{"status":"ok"}"#)
920			.create_async()
921			.await;
922
923		let config = WebhookConfig {
924			url: format!("{}/webhook", server.url()),
925			method: "POST".to_string(),
926			headers: HashMap::new(),
927			timeout: Duration::from_secs(5),
928			retry_config: RetryConfig {
929				max_retries: 0,
930				initial_backoff: Duration::from_millis(10),
931				max_backoff: Duration::from_secs(1),
932				backoff_multiplier: 2.0,
933			},
934		};
935
936		let sender = HttpWebhookSender::new(config);
937
938		let now = Utc::now();
939		let event = WebhookEvent {
940			task_id: TaskId::new(),
941			task_name: "test_task".to_string(),
942			status: TaskStatus::Success,
943			result: Some("OK".to_string()),
944			error: None,
945			started_at: now,
946			completed_at: now,
947			duration_ms: 100,
948		};
949
950		// Act
951		let result = sender.send_with_retry(&event).await;
952
953		// Assert
954		assert!(result.is_ok());
955		mock.assert_async().await;
956	}
957
958	#[rstest]
959	#[tokio::test]
960	async fn test_webhook_send_retry_then_success() {
961		// Arrange
962		let mut server = mockito::Server::new_async().await;
963
964		// First two requests fail, third succeeds
965		let mock1 = server
966			.mock("POST", "/webhook")
967			.with_status(500)
968			.expect(1)
969			.create_async()
970			.await;
971
972		let mock2 = server
973			.mock("POST", "/webhook")
974			.with_status(503)
975			.expect(1)
976			.create_async()
977			.await;
978
979		let mock3 = server
980			.mock("POST", "/webhook")
981			.with_status(200)
982			.expect(1)
983			.create_async()
984			.await;
985
986		let config = WebhookConfig {
987			url: format!("{}/webhook", server.url()),
988			method: "POST".to_string(),
989			headers: HashMap::new(),
990			timeout: Duration::from_secs(5),
991			retry_config: RetryConfig {
992				max_retries: 3,
993				initial_backoff: Duration::from_millis(10),
994				max_backoff: Duration::from_secs(1),
995				backoff_multiplier: 2.0,
996			},
997		};
998
999		let sender = HttpWebhookSender::new(config);
1000
1001		let now = Utc::now();
1002		let event = WebhookEvent {
1003			task_id: TaskId::new(),
1004			task_name: "test_task".to_string(),
1005			status: TaskStatus::Success,
1006			result: Some("OK".to_string()),
1007			error: None,
1008			started_at: now,
1009			completed_at: now,
1010			duration_ms: 100,
1011		};
1012
1013		// Act
1014		let result = sender.send_with_retry(&event).await;
1015
1016		// Assert
1017		assert!(result.is_ok());
1018		mock1.assert_async().await;
1019		mock2.assert_async().await;
1020		mock3.assert_async().await;
1021	}
1022
1023	#[rstest]
1024	#[tokio::test]
1025	async fn test_webhook_send_max_retries_exceeded() {
1026		// Arrange
1027		let mut server = mockito::Server::new_async().await;
1028
1029		// All requests fail
1030		let mock = server
1031			.mock("POST", "/webhook")
1032			.with_status(500)
1033			.expect(4) // Initial + 3 retries
1034			.create_async()
1035			.await;
1036
1037		let config = WebhookConfig {
1038			url: format!("{}/webhook", server.url()),
1039			method: "POST".to_string(),
1040			headers: HashMap::new(),
1041			timeout: Duration::from_secs(5),
1042			retry_config: RetryConfig {
1043				max_retries: 3,
1044				initial_backoff: Duration::from_millis(10),
1045				max_backoff: Duration::from_secs(1),
1046				backoff_multiplier: 2.0,
1047			},
1048		};
1049
1050		let sender = HttpWebhookSender::new(config);
1051
1052		let now = Utc::now();
1053		let event = WebhookEvent {
1054			task_id: TaskId::new(),
1055			task_name: "test_task".to_string(),
1056			status: TaskStatus::Success,
1057			result: Some("OK".to_string()),
1058			error: None,
1059			started_at: now,
1060			completed_at: now,
1061			duration_ms: 100,
1062		};
1063
1064		// Act
1065		let result = sender.send_with_retry(&event).await;
1066
1067		// Assert
1068		assert!(result.is_err());
1069		assert!(matches!(
1070			result.unwrap_err(),
1071			WebhookError::MaxRetriesExceeded
1072		));
1073		mock.assert_async().await;
1074	}
1075
1076	#[rstest]
1077	#[tokio::test]
1078	async fn test_webhook_custom_headers() {
1079		// Arrange
1080		let mut server = mockito::Server::new_async().await;
1081
1082		let mock = server
1083			.mock("POST", "/webhook")
1084			.match_header("Authorization", "Bearer test-token")
1085			.match_header("X-Custom-Header", "custom-value")
1086			.with_status(200)
1087			.create_async()
1088			.await;
1089
1090		let mut headers = HashMap::new();
1091		headers.insert("Authorization".to_string(), "Bearer test-token".to_string());
1092		headers.insert("X-Custom-Header".to_string(), "custom-value".to_string());
1093
1094		let config = WebhookConfig {
1095			url: format!("{}/webhook", server.url()),
1096			method: "POST".to_string(),
1097			headers,
1098			timeout: Duration::from_secs(5),
1099			retry_config: RetryConfig {
1100				max_retries: 0,
1101				initial_backoff: Duration::from_millis(10),
1102				max_backoff: Duration::from_secs(1),
1103				backoff_multiplier: 2.0,
1104			},
1105		};
1106
1107		let sender = HttpWebhookSender::new(config);
1108
1109		let now = Utc::now();
1110		let event = WebhookEvent {
1111			task_id: TaskId::new(),
1112			task_name: "test_task".to_string(),
1113			status: TaskStatus::Success,
1114			result: Some("OK".to_string()),
1115			error: None,
1116			started_at: now,
1117			completed_at: now,
1118			duration_ms: 100,
1119		};
1120
1121		// Act
1122		let result = sender.send_with_retry(&event).await;
1123
1124		// Assert
1125		assert!(result.is_ok());
1126		mock.assert_async().await;
1127	}
1128
1129	#[rstest]
1130	#[tokio::test]
1131	async fn test_webhook_retry_loop_sleeps_between_retries() {
1132		// Arrange - verify that the retry loop actually sleeps (using backoff delay)
1133		// between failed attempts, preventing a tight CPU-spinning retry loop.
1134		let mut server = mockito::Server::new_async().await;
1135
1136		// All requests fail so we go through all retries
1137		let _mock = server
1138			.mock("POST", "/webhook")
1139			.with_status(500)
1140			.expect(3) // Initial + 2 retries
1141			.create_async()
1142			.await;
1143
1144		let config = WebhookConfig {
1145			url: format!("{}/webhook", server.url()),
1146			method: "POST".to_string(),
1147			headers: HashMap::new(),
1148			timeout: Duration::from_secs(5),
1149			retry_config: RetryConfig {
1150				max_retries: 2,
1151				initial_backoff: Duration::from_millis(50),
1152				max_backoff: Duration::from_secs(1),
1153				backoff_multiplier: 2.0,
1154			},
1155		};
1156
1157		let sender = HttpWebhookSender::new(config);
1158
1159		let now = Utc::now();
1160		let event = WebhookEvent {
1161			task_id: TaskId::new(),
1162			task_name: "test_task".to_string(),
1163			status: TaskStatus::Success,
1164			result: None,
1165			error: None,
1166			started_at: now,
1167			completed_at: now,
1168			duration_ms: 0,
1169		};
1170
1171		// Act - measure elapsed time to verify sleep actually occurs
1172		let start = std::time::Instant::now();
1173		let result = sender.send_with_retry(&event).await;
1174		let elapsed = start.elapsed();
1175
1176		// Assert - with 2 retries at 50ms and 100ms backoff (plus jitter),
1177		// total sleep should be at least ~100ms. Without the sleep call,
1178		// elapsed would be near-zero (only network round-trip time).
1179		assert!(result.is_err());
1180		assert!(
1181			elapsed >= Duration::from_millis(80),
1182			"Expected at least 80ms delay from retry backoff sleep, got {:?}",
1183			elapsed
1184		);
1185	}
1186
1187	// SSRF protection tests
1188
1189	#[rstest]
1190	#[case("127.0.0.1", true)]
1191	#[case("127.0.0.2", true)]
1192	#[case("127.255.255.255", true)]
1193	#[case("10.0.0.1", true)]
1194	#[case("10.255.255.255", true)]
1195	#[case("172.16.0.1", true)]
1196	#[case("172.31.255.255", true)]
1197	#[case("192.168.0.1", true)]
1198	#[case("192.168.255.255", true)]
1199	#[case("169.254.169.254", true)]
1200	#[case("169.254.170.2", true)]
1201	#[case("::1", true)]
1202	#[case("fe80::1", true)]
1203	#[case("fc00::1", true)]
1204	#[case("8.8.8.8", false)]
1205	#[case("1.1.1.1", false)]
1206	#[case("203.0.113.1", false)]
1207	#[case("2001:db8::1", false)]
1208	fn test_is_blocked_ip(#[case] ip_str: &str, #[case] expected: bool) {
1209		// Arrange
1210		let ip: IpAddr = ip_str.parse().unwrap();
1211
1212		// Act
1213		let result = is_blocked_ip(&ip);
1214
1215		// Assert
1216		assert_eq!(
1217			result, expected,
1218			"IP {} should be blocked={}",
1219			ip_str, expected
1220		);
1221	}
1222
1223	#[rstest]
1224	#[case("https://example.com/webhook", true)]
1225	#[case("https://api.example.com/hooks/123", true)]
1226	#[case("https://hooks.slack.com/services/T00/B00/xxx", true)]
1227	fn test_validate_webhook_url_accepts_valid_urls(#[case] url: &str, #[case] _valid: bool) {
1228		// Act
1229		let result = validate_webhook_url(url);
1230
1231		// Assert
1232		assert!(
1233			result.is_ok(),
1234			"URL {} should be valid: {:?}",
1235			url,
1236			result.err()
1237		);
1238	}
1239
1240	#[rstest]
1241	#[case("http://example.com/webhook", "SchemeNotAllowed")]
1242	#[case("ftp://example.com/file", "SchemeNotAllowed")]
1243	#[case("not-a-url", "InvalidUrl")]
1244	#[case("https://127.0.0.1/webhook", "BlockedIpAddress")]
1245	#[case("https://10.0.0.1/webhook", "BlockedIpAddress")]
1246	#[case("https://172.16.0.1/webhook", "BlockedIpAddress")]
1247	#[case("https://192.168.1.1/webhook", "BlockedIpAddress")]
1248	#[case("https://169.254.169.254/latest/meta-data/", "BlockedIpAddress")]
1249	#[case("https://[::1]/webhook", "BlockedIpAddress")]
1250	#[case("https://[fe80::1]/webhook", "BlockedIpAddress")]
1251	#[case("https://[fc00::1]/webhook", "BlockedIpAddress")]
1252	#[case("https://localhost/webhook", "BlockedIpAddress")]
1253	#[case("https://sub.localhost/webhook", "BlockedIpAddress")]
1254	#[case("https://service.internal/webhook", "BlockedIpAddress")]
1255	#[case("https://printer.local/webhook", "BlockedIpAddress")]
1256	fn test_validate_webhook_url_rejects_unsafe_urls(
1257		#[case] url: &str,
1258		#[case] expected_error: &str,
1259	) {
1260		// Act
1261		let result = validate_webhook_url(url);
1262
1263		// Assert
1264		assert!(result.is_err(), "URL {} should be rejected", url);
1265		let err = result.unwrap_err();
1266		let err_name = match &err {
1267			WebhookError::InvalidUrl(_) => "InvalidUrl",
1268			WebhookError::SchemeNotAllowed(_) => "SchemeNotAllowed",
1269			WebhookError::BlockedIpAddress(_) => "BlockedIpAddress",
1270			WebhookError::DnsResolutionFailed(_) => "DnsResolutionFailed",
1271			_ => "Other",
1272		};
1273		assert_eq!(
1274			err_name, expected_error,
1275			"URL {} should produce {} error, got: {}",
1276			url, expected_error, err
1277		);
1278	}
1279
1280	#[rstest]
1281	fn test_validate_webhook_url_blocks_cloud_metadata_endpoint() {
1282		// Arrange
1283		let metadata_urls = [
1284			"https://169.254.169.254/latest/meta-data/",
1285			"https://169.254.169.254/computeMetadata/v1/",
1286			"https://169.254.170.2/v2/credentials",
1287		];
1288
1289		for url in &metadata_urls {
1290			// Act
1291			let result = validate_webhook_url(url);
1292
1293			// Assert
1294			assert!(
1295				result.is_err(),
1296				"Cloud metadata URL {} should be blocked",
1297				url
1298			);
1299			assert!(
1300				matches!(result.unwrap_err(), WebhookError::BlockedIpAddress(_)),
1301				"Cloud metadata URL {} should produce BlockedIpAddress error",
1302				url
1303			);
1304		}
1305	}
1306
1307	#[rstest]
1308	fn test_webhook_error_display_ssrf_variants() {
1309		// Arrange & Act & Assert
1310		let error = WebhookError::InvalidUrl("bad-url".to_string());
1311		assert_eq!(error.to_string(), "Invalid webhook URL: bad-url");
1312
1313		let error = WebhookError::SchemeNotAllowed("http".to_string());
1314		assert_eq!(
1315			error.to_string(),
1316			"URL scheme not allowed: http. Only HTTPS is permitted for webhooks"
1317		);
1318
1319		let error = WebhookError::BlockedIpAddress("127.0.0.1".to_string());
1320		assert_eq!(
1321			error.to_string(),
1322			"Webhook URL resolves to blocked IP address: 127.0.0.1"
1323		);
1324
1325		let error = WebhookError::DnsResolutionFailed("bad.host".to_string());
1326		assert_eq!(
1327			error.to_string(),
1328			"DNS resolution failed for webhook URL host: bad.host"
1329		);
1330	}
1331
1332	#[rstest]
1333	#[tokio::test]
1334	async fn test_send_rejects_http_url_via_ssrf_validation() {
1335		// Arrange
1336		let config = WebhookConfig {
1337			url: "http://example.com/webhook".to_string(),
1338			method: "POST".to_string(),
1339			headers: HashMap::new(),
1340			timeout: Duration::from_secs(5),
1341			retry_config: RetryConfig::default(),
1342		};
1343		let sender = HttpWebhookSender::new(config);
1344		let now = Utc::now();
1345		let event = WebhookEvent {
1346			task_id: TaskId::new(),
1347			task_name: "test_task".to_string(),
1348			status: TaskStatus::Success,
1349			result: None,
1350			error: None,
1351			started_at: now,
1352			completed_at: now,
1353			duration_ms: 0,
1354		};
1355
1356		// Act
1357		let result = sender.send(&event).await;
1358
1359		// Assert
1360		assert!(result.is_err());
1361		assert!(matches!(
1362			result.unwrap_err(),
1363			WebhookError::SchemeNotAllowed(_)
1364		));
1365	}
1366
1367	#[rstest]
1368	#[tokio::test]
1369	async fn test_send_rejects_private_ip_via_ssrf_validation() {
1370		// Arrange
1371		let config = WebhookConfig {
1372			url: "https://192.168.1.1/webhook".to_string(),
1373			method: "POST".to_string(),
1374			headers: HashMap::new(),
1375			timeout: Duration::from_secs(5),
1376			retry_config: RetryConfig::default(),
1377		};
1378		let sender = HttpWebhookSender::new(config);
1379		let now = Utc::now();
1380		let event = WebhookEvent {
1381			task_id: TaskId::new(),
1382			task_name: "test_task".to_string(),
1383			status: TaskStatus::Success,
1384			result: None,
1385			error: None,
1386			started_at: now,
1387			completed_at: now,
1388			duration_ms: 0,
1389		};
1390
1391		// Act
1392		let result = sender.send(&event).await;
1393
1394		// Assert
1395		assert!(result.is_err());
1396		assert!(matches!(
1397			result.unwrap_err(),
1398			WebhookError::BlockedIpAddress(_)
1399		));
1400	}
1401
1402	// Regression tests for #742: the retry loop MUST call tokio::time::sleep between
1403	// each failed attempt. Without the sleep, retries would spin at CPU speed and
1404	// flood the upstream server. The parametrized cases cover 1, 2, and 3 retries
1405	// with a fixed initial_backoff so elapsed time is predictable.
1406
1407	// The minimum elapsed time accounts for jitter (±25%) on initial_backoff:
1408	//   case_1: 1 retry × 50ms × 0.75 = ~37ms  → assert ≥ 30ms
1409	//   case_2: 2 retries × (50ms + 100ms) × 0.75 ≈ 112ms → assert ≥ 80ms
1410	//   case_3: 3 retries × (50+100+200)ms × 0.75 ≈ 262ms → assert ≥ 200ms
1411	#[rstest]
1412	#[case(1, Duration::from_millis(30), Duration::from_millis(50))]
1413	#[case(2, Duration::from_millis(80), Duration::from_millis(50))]
1414	#[case(3, Duration::from_millis(200), Duration::from_millis(50))]
1415	#[tokio::test]
1416	async fn test_webhook_retry_sleep_is_called_between_attempts(
1417		#[case] max_retries: u32,
1418		#[case] min_elapsed: Duration,
1419		#[case] initial_backoff: Duration,
1420	) {
1421		// Arrange - all server responses fail so the full retry sequence is exercised.
1422		let mut server = mockito::Server::new_async().await;
1423
1424		// Expect initial attempt + max_retries retries
1425		let _mock = server
1426			.mock("POST", "/webhook")
1427			.with_status(500)
1428			.expect((max_retries + 1) as usize)
1429			.create_async()
1430			.await;
1431
1432		let config = WebhookConfig {
1433			url: format!("{}/webhook", server.url()),
1434			method: "POST".to_string(),
1435			headers: HashMap::new(),
1436			timeout: Duration::from_secs(5),
1437			retry_config: RetryConfig {
1438				max_retries,
1439				initial_backoff,
1440				max_backoff: Duration::from_secs(1),
1441				backoff_multiplier: 2.0,
1442			},
1443		};
1444
1445		let sender = HttpWebhookSender::new(config);
1446		let now = Utc::now();
1447		let event = WebhookEvent {
1448			task_id: TaskId::new(),
1449			task_name: "regression_742".to_string(),
1450			status: TaskStatus::Success,
1451			result: None,
1452			error: None,
1453			started_at: now,
1454			completed_at: now,
1455			duration_ms: 0,
1456		};
1457
1458		// Act - measure total elapsed time across all retries
1459		let start = std::time::Instant::now();
1460		let result = sender.send_with_retry(&event).await;
1461		let elapsed = start.elapsed();
1462
1463		// Assert - at least one backoff sleep must have occurred, so elapsed time
1464		// must exceed min_elapsed. Without sleep the loop would complete in near-zero
1465		// wall time (only network round-trip overhead from mockito).
1466		assert!(
1467			result.is_err(),
1468			"expected MaxRetriesExceeded after all retries"
1469		);
1470		assert!(
1471			elapsed >= min_elapsed,
1472			"Regression #742: expected sleep between retries (>={:?}), got {:?}",
1473			min_elapsed,
1474			elapsed
1475		);
1476	}
1477}