Skip to main content

fraiseql_core/runtime/subscription/
webhook.rs

1use async_trait::async_trait;
2use serde::Serialize;
3
4use super::{SubscriptionError, transport::TransportAdapter, types::SubscriptionEvent};
5
6/// Webhook transport adapter configuration.
7#[derive(Debug, Clone)]
8pub struct WebhookTransportConfig {
9    /// Target URL for webhook delivery.
10    pub url: String,
11
12    /// Secret key for HMAC-SHA256 signature.
13    pub secret: Option<String>,
14
15    /// Request timeout in milliseconds.
16    pub timeout_ms: u64,
17
18    /// Maximum retry attempts.
19    pub max_retries: u32,
20
21    /// Initial retry delay in milliseconds (exponential backoff).
22    pub retry_delay_ms: u64,
23
24    /// Custom headers to include in requests.
25    pub headers: std::collections::HashMap<String, String>,
26}
27
28impl WebhookTransportConfig {
29    /// Create a new webhook configuration.
30    #[must_use]
31    pub fn new(url: impl Into<String>) -> Self {
32        Self {
33            url:            url.into(),
34            secret:         None,
35            timeout_ms:     30_000,
36            max_retries:    3,
37            retry_delay_ms: 1000,
38            headers:        std::collections::HashMap::new(),
39        }
40    }
41
42    /// Set the signing secret for HMAC-SHA256 signatures.
43    #[must_use]
44    pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
45        self.secret = Some(secret.into());
46        self
47    }
48
49    /// Set the request timeout.
50    #[must_use]
51    pub const fn with_timeout(mut self, timeout_ms: u64) -> Self {
52        self.timeout_ms = timeout_ms;
53        self
54    }
55
56    /// Set maximum retry attempts.
57    #[must_use]
58    pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
59        self.max_retries = max_retries;
60        self
61    }
62
63    /// Set initial retry delay.
64    #[must_use]
65    pub const fn with_retry_delay(mut self, delay_ms: u64) -> Self {
66        self.retry_delay_ms = delay_ms;
67        self
68    }
69
70    /// Add a custom header.
71    #[must_use]
72    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
73        self.headers.insert(name.into(), value.into());
74        self
75    }
76}
77
78/// Webhook payload format for event delivery.
79#[derive(Debug, Clone, Serialize)]
80pub struct WebhookPayload {
81    /// Unique event identifier.
82    pub event_id: String,
83
84    /// Subscription name that triggered the event.
85    pub subscription_name: String,
86
87    /// Entity type (e.g., "Order").
88    pub entity_type: String,
89
90    /// Entity primary key.
91    pub entity_id: String,
92
93    /// Operation type.
94    pub operation: String,
95
96    /// Event data.
97    pub data: serde_json::Value,
98
99    /// Previous data (for UPDATE operations).
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub old_data: Option<serde_json::Value>,
102
103    /// Event timestamp (ISO 8601).
104    pub timestamp: String,
105
106    /// Sequence number for ordering.
107    pub sequence_number: u64,
108}
109
110impl WebhookPayload {
111    /// Create a webhook payload from a subscription event.
112    #[must_use]
113    pub fn from_event(event: &SubscriptionEvent, subscription_name: &str) -> Self {
114        Self {
115            event_id:          event.event_id.clone(),
116            subscription_name: subscription_name.to_string(),
117            entity_type:       event.entity_type.clone(),
118            entity_id:         event.entity_id.clone(),
119            operation:         format!("{:?}", event.operation),
120            data:              event.data.clone(),
121            old_data:          event.old_data.clone(),
122            timestamp:         event.timestamp.to_rfc3339(),
123            sequence_number:   event.sequence_number,
124        }
125    }
126}
127
128/// Webhook transport adapter for HTTP POST delivery.
129///
130/// Delivers subscription events via HTTP POST with:
131/// - HMAC-SHA256 signature (X-FraiseQL-Signature header)
132/// - Exponential backoff retry logic
133/// - Configurable timeouts
134///
135/// # Example
136///
137/// ```no_run
138/// // Requires: live HTTP endpoint for webhook delivery.
139/// use fraiseql_core::runtime::subscription::{
140///     WebhookAdapter, WebhookTransportConfig, SubscriptionEvent, TransportAdapter,
141/// };
142///
143/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
144/// # let event: SubscriptionEvent = panic!("example");
145/// let config = WebhookTransportConfig::new("https://api.example.com/webhooks")
146///     .with_secret("my_secret_key")
147///     .with_max_retries(3);
148///
149/// let adapter = WebhookAdapter::new(config)?;
150/// adapter.deliver(&event, "orderCreated").await?;
151/// # Ok(())
152/// # }
153/// ```
154pub struct WebhookAdapter {
155    config: WebhookTransportConfig,
156    client: reqwest::Client,
157}
158
159impl WebhookAdapter {
160    /// Create a new webhook adapter.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if the URL targets a private/reserved IP (SSRF protection),
165    /// or if the underlying HTTP client cannot be initialized.
166    pub fn new(config: WebhookTransportConfig) -> Result<Self, SubscriptionError> {
167        validate_webhook_url(&config.url)?;
168
169        let client = reqwest::Client::builder()
170            .timeout(std::time::Duration::from_millis(config.timeout_ms))
171            .build()
172            .map_err(|e| SubscriptionError::Internal(format!("HTTP client init failed: {e}")))?;
173
174        Ok(Self { config, client })
175    }
176
177    /// Compute HMAC-SHA256 signature for payload.
178    fn compute_signature(&self, payload: &str) -> Option<String> {
179        use hmac::{Hmac, Mac};
180        use sha2::Sha256;
181
182        let secret = self.config.secret.as_ref()?;
183
184        #[allow(clippy::expect_used)]
185        // Reason: SHA-256 HMAC accepts keys of any size; new_from_slice cannot fail here
186        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes())
187            .expect("SHA-256 HMAC accepts any key size");
188        mac.update(payload.as_bytes());
189
190        let result = mac.finalize();
191        Some(hex::encode(result.into_bytes()))
192    }
193}
194
195// Reason: TransportAdapter is defined with #[async_trait]; all implementations must match
196// its transformed method signatures to satisfy the trait contract
197// async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
198#[async_trait]
199impl TransportAdapter for WebhookAdapter {
200    async fn deliver(
201        &self,
202        event: &SubscriptionEvent,
203        subscription_name: &str,
204    ) -> Result<(), SubscriptionError> {
205        let payload = WebhookPayload::from_event(event, subscription_name);
206        let payload_json = serde_json::to_string(&payload).map_err(|e| {
207            SubscriptionError::Internal(format!("Failed to serialize payload: {e}"))
208        })?;
209
210        let mut attempt = 0;
211        let mut delay = self.config.retry_delay_ms;
212
213        loop {
214            attempt += 1;
215
216            let mut request = self
217                .client
218                .post(&self.config.url)
219                .header("Content-Type", "application/json")
220                .header("X-FraiseQL-Event-Id", &event.event_id)
221                .header("X-FraiseQL-Event-Type", subscription_name);
222
223            // Add signature if secret is configured
224            if let Some(signature) = self.compute_signature(&payload_json) {
225                request = request.header("X-FraiseQL-Signature", format!("sha256={signature}"));
226            }
227
228            // Add custom headers
229            for (name, value) in &self.config.headers {
230                request = request.header(name, value);
231            }
232
233            let result = request.body(payload_json.clone()).send().await;
234
235            match result {
236                Ok(response) if response.status().is_success() => {
237                    tracing::debug!(
238                        url = %self.config.url,
239                        event_id = %event.event_id,
240                        attempt = attempt,
241                        "Webhook delivered successfully"
242                    );
243                    return Ok(());
244                },
245                Ok(response) => {
246                    let status = response.status();
247                    tracing::warn!(
248                        url = %self.config.url,
249                        event_id = %event.event_id,
250                        status = %status,
251                        attempt = attempt,
252                        "Webhook delivery failed with status"
253                    );
254
255                    // Don't retry on client errors (4xx) except 429
256                    if status.is_client_error() && status.as_u16() != 429 {
257                        return Err(SubscriptionError::Internal(format!(
258                            "Webhook delivery failed: {status}"
259                        )));
260                    }
261                },
262                Err(e) => {
263                    tracing::warn!(
264                        url = %self.config.url,
265                        event_id = %event.event_id,
266                        error = %e,
267                        attempt = attempt,
268                        "Webhook delivery error"
269                    );
270                },
271            }
272
273            // Check if we should retry
274            if attempt >= self.config.max_retries {
275                return Err(SubscriptionError::Internal(format!(
276                    "Webhook delivery failed after {} attempts",
277                    attempt
278                )));
279            }
280
281            // Exponential backoff
282            tracing::debug!(delay_ms = delay, "Retrying webhook delivery");
283            tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
284            delay *= 2;
285        }
286    }
287
288    fn name(&self) -> &'static str {
289        "webhook"
290    }
291
292    async fn health_check(&self) -> bool {
293        // Simple health check - verify URL is reachable
294        match self.client.head(&self.config.url).send().await {
295            Ok(response) => response.status().is_success() || response.status().as_u16() == 405,
296            Err(_) => false,
297        }
298    }
299}
300
301impl std::fmt::Debug for WebhookAdapter {
302    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303        f.debug_struct("WebhookAdapter")
304            .field("url", &self.config.url)
305            .field("has_secret", &self.config.secret.is_some())
306            .finish_non_exhaustive()
307    }
308}
309
310/// Validate a webhook target URL for SSRF risks.
311///
312/// Rejects URLs targeting private/loopback/link-local addresses to prevent
313/// server-side request forgery via attacker-controlled webhook configurations.
314///
315/// # Errors
316///
317/// Returns `SubscriptionError::Internal` if the URL is invalid or targets a
318/// forbidden host (private IP, loopback, link-local).
319pub fn validate_webhook_url(url: &str) -> Result<(), SubscriptionError> {
320    let parsed = reqwest::Url::parse(url)
321        .map_err(|e| SubscriptionError::Internal(format!("Invalid webhook URL '{url}': {e}")))?;
322
323    let host_raw = parsed
324        .host_str()
325        .ok_or_else(|| SubscriptionError::Internal(format!("Webhook URL has no host: {url}")))?;
326
327    // Strip IPv6 brackets added by the url crate (e.g. "[::1]" → "::1").
328    let host = if host_raw.starts_with('[') && host_raw.ends_with(']') {
329        &host_raw[1..host_raw.len() - 1]
330    } else {
331        host_raw
332    };
333
334    let lower_host = host.to_ascii_lowercase();
335    if lower_host == "localhost" || lower_host.ends_with(".localhost") {
336        return Err(SubscriptionError::Internal(format!(
337            "Webhook URL targets a loopback host ({host}) — SSRF protection blocked"
338        )));
339    }
340
341    if let Ok(ip) = host.parse::<std::net::IpAddr>() {
342        if is_webhook_ssrf_blocked_ip(&ip) {
343            return Err(SubscriptionError::Internal(format!(
344                "Webhook URL targets a private/reserved IP ({ip}) — SSRF protection blocked"
345            )));
346        }
347    }
348
349    Ok(())
350}
351
352/// Returns `true` for IP ranges that webhook delivery must never contact.
353fn is_webhook_ssrf_blocked_ip(ip: &std::net::IpAddr) -> bool {
354    match ip {
355        std::net::IpAddr::V4(v4) => {
356            let o = v4.octets();
357            o[0] == 127                                          // loopback 127/8
358            || o[0] == 10                                        // RFC 1918 10/8
359            || (o[0] == 172 && (16..=31).contains(&o[1]))       // RFC 1918 172.16/12
360            || (o[0] == 192 && o[1] == 168)                     // RFC 1918 192.168/16
361            || (o[0] == 169 && o[1] == 254)                     // link-local 169.254/16
362            || (o[0] == 100 && (64..=127).contains(&o[1]))      // CGNAT 100.64/10
363            || o == [0, 0, 0, 0] // unspecified
364        },
365        std::net::IpAddr::V6(v6) => {
366            v6.is_loopback()                                     // ::1
367            || v6.is_unspecified()                               // ::
368            || {
369                let s = v6.segments();
370                (s[0] & 0xfe00) == 0xfc00                        // ULA fc00::/7
371                || (s[0] & 0xffc0) == 0xfe80                    // link-local fe80::/10
372                || (s[0] == 0 && s[1] == 0 && s[2] == 0        // ::ffff:0:0/96
373                    && s[3] == 0 && s[4] == 0 && s[5] == 0xffff)
374            }
375        },
376    }
377}