Skip to main content

fraiseql_core/runtime/subscription/
webhook.rs

1use serde::Serialize;
2
3use super::{SubscriptionError, transport::TransportAdapter, types::SubscriptionEvent};
4
5/// Webhook transport adapter configuration.
6#[derive(Debug, Clone)]
7pub struct WebhookConfig {
8    /// Target URL for webhook delivery.
9    pub url: String,
10
11    /// Secret key for HMAC-SHA256 signature.
12    pub secret: Option<String>,
13
14    /// Request timeout in milliseconds.
15    pub timeout_ms: u64,
16
17    /// Maximum retry attempts.
18    pub max_retries: u32,
19
20    /// Initial retry delay in milliseconds (exponential backoff).
21    pub retry_delay_ms: u64,
22
23    /// Custom headers to include in requests.
24    pub headers: std::collections::HashMap<String, String>,
25}
26
27impl WebhookConfig {
28    /// Create a new webhook configuration.
29    #[must_use]
30    pub fn new(url: impl Into<String>) -> Self {
31        Self {
32            url:            url.into(),
33            secret:         None,
34            timeout_ms:     30_000,
35            max_retries:    3,
36            retry_delay_ms: 1000,
37            headers:        std::collections::HashMap::new(),
38        }
39    }
40
41    /// Set the signing secret for HMAC-SHA256 signatures.
42    #[must_use]
43    pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
44        self.secret = Some(secret.into());
45        self
46    }
47
48    /// Set the request timeout.
49    #[must_use]
50    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
51        self.timeout_ms = timeout_ms;
52        self
53    }
54
55    /// Set maximum retry attempts.
56    #[must_use]
57    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
58        self.max_retries = max_retries;
59        self
60    }
61
62    /// Set initial retry delay.
63    #[must_use]
64    pub fn with_retry_delay(mut self, delay_ms: u64) -> Self {
65        self.retry_delay_ms = delay_ms;
66        self
67    }
68
69    /// Add a custom header.
70    #[must_use]
71    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
72        self.headers.insert(name.into(), value.into());
73        self
74    }
75}
76
77/// Webhook payload format for event delivery.
78#[derive(Debug, Clone, Serialize)]
79pub struct WebhookPayload {
80    /// Unique event identifier.
81    pub event_id: String,
82
83    /// Subscription name that triggered the event.
84    pub subscription_name: String,
85
86    /// Entity type (e.g., "Order").
87    pub entity_type: String,
88
89    /// Entity primary key.
90    pub entity_id: String,
91
92    /// Operation type.
93    pub operation: String,
94
95    /// Event data.
96    pub data: serde_json::Value,
97
98    /// Previous data (for UPDATE operations).
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub old_data: Option<serde_json::Value>,
101
102    /// Event timestamp (ISO 8601).
103    pub timestamp: String,
104
105    /// Sequence number for ordering.
106    pub sequence_number: u64,
107}
108
109impl WebhookPayload {
110    /// Create a webhook payload from a subscription event.
111    #[must_use]
112    pub fn from_event(event: &SubscriptionEvent, subscription_name: &str) -> Self {
113        Self {
114            event_id:          event.event_id.clone(),
115            subscription_name: subscription_name.to_string(),
116            entity_type:       event.entity_type.clone(),
117            entity_id:         event.entity_id.clone(),
118            operation:         format!("{:?}", event.operation),
119            data:              event.data.clone(),
120            old_data:          event.old_data.clone(),
121            timestamp:         event.timestamp.to_rfc3339(),
122            sequence_number:   event.sequence_number,
123        }
124    }
125}
126
127/// Webhook transport adapter for HTTP POST delivery.
128///
129/// Delivers subscription events via HTTP POST with:
130/// - HMAC-SHA256 signature (X-FraiseQL-Signature header)
131/// - Exponential backoff retry logic
132/// - Configurable timeouts
133///
134/// # Example
135///
136/// ```ignore
137/// use fraiseql_core::runtime::subscription::{WebhookAdapter, WebhookConfig};
138///
139/// let config = WebhookConfig::new("https://api.example.com/webhooks")
140///     .with_secret("my_secret_key")
141///     .with_max_retries(3);
142///
143/// let adapter = WebhookAdapter::new(config);
144/// adapter.deliver(&event, "orderCreated").await?;
145/// ```
146pub struct WebhookAdapter {
147    config: WebhookConfig,
148    client: reqwest::Client,
149}
150
151impl WebhookAdapter {
152    /// Create a new webhook adapter.
153    ///
154    /// # Panics
155    ///
156    /// Panics if the HTTP client cannot be built (should not happen in practice).
157    #[must_use]
158    pub fn new(config: WebhookConfig) -> Self {
159        let client = reqwest::Client::builder()
160            .timeout(std::time::Duration::from_millis(config.timeout_ms))
161            .build()
162            .expect("Failed to build HTTP client");
163
164        Self { config, client }
165    }
166
167    /// Compute HMAC-SHA256 signature for payload.
168    fn compute_signature(&self, payload: &str) -> Option<String> {
169        use hmac::{Hmac, Mac};
170        use sha2::Sha256;
171
172        let secret = self.config.secret.as_ref()?;
173
174        let mut mac =
175            Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("HMAC can take any size key");
176        mac.update(payload.as_bytes());
177
178        let result = mac.finalize();
179        Some(hex::encode(result.into_bytes()))
180    }
181}
182
183#[async_trait::async_trait]
184impl TransportAdapter for WebhookAdapter {
185    async fn deliver(
186        &self,
187        event: &SubscriptionEvent,
188        subscription_name: &str,
189    ) -> Result<(), SubscriptionError> {
190        let payload = WebhookPayload::from_event(event, subscription_name);
191        let payload_json = serde_json::to_string(&payload).map_err(|e| {
192            SubscriptionError::Internal(format!("Failed to serialize payload: {e}"))
193        })?;
194
195        let mut attempt = 0;
196        let mut delay = self.config.retry_delay_ms;
197
198        loop {
199            attempt += 1;
200
201            let mut request = self
202                .client
203                .post(&self.config.url)
204                .header("Content-Type", "application/json")
205                .header("X-FraiseQL-Event-Id", &event.event_id)
206                .header("X-FraiseQL-Event-Type", subscription_name);
207
208            // Add signature if secret is configured
209            if let Some(signature) = self.compute_signature(&payload_json) {
210                request = request.header("X-FraiseQL-Signature", format!("sha256={signature}"));
211            }
212
213            // Add custom headers
214            for (name, value) in &self.config.headers {
215                request = request.header(name, value);
216            }
217
218            let result = request.body(payload_json.clone()).send().await;
219
220            match result {
221                Ok(response) if response.status().is_success() => {
222                    tracing::debug!(
223                        url = %self.config.url,
224                        event_id = %event.event_id,
225                        attempt = attempt,
226                        "Webhook delivered successfully"
227                    );
228                    return Ok(());
229                },
230                Ok(response) => {
231                    let status = response.status();
232                    tracing::warn!(
233                        url = %self.config.url,
234                        event_id = %event.event_id,
235                        status = %status,
236                        attempt = attempt,
237                        "Webhook delivery failed with status"
238                    );
239
240                    // Don't retry on client errors (4xx) except 429
241                    if status.is_client_error() && status.as_u16() != 429 {
242                        return Err(SubscriptionError::Internal(format!(
243                            "Webhook delivery failed: {status}"
244                        )));
245                    }
246                },
247                Err(e) => {
248                    tracing::warn!(
249                        url = %self.config.url,
250                        event_id = %event.event_id,
251                        error = %e,
252                        attempt = attempt,
253                        "Webhook delivery error"
254                    );
255                },
256            }
257
258            // Check if we should retry
259            if attempt >= self.config.max_retries {
260                return Err(SubscriptionError::Internal(format!(
261                    "Webhook delivery failed after {} attempts",
262                    attempt
263                )));
264            }
265
266            // Exponential backoff
267            tracing::debug!(delay_ms = delay, "Retrying webhook delivery");
268            tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
269            delay *= 2;
270        }
271    }
272
273    fn name(&self) -> &'static str {
274        "webhook"
275    }
276
277    async fn health_check(&self) -> bool {
278        // Simple health check - verify URL is reachable
279        match self.client.head(&self.config.url).send().await {
280            Ok(response) => response.status().is_success() || response.status().as_u16() == 405,
281            Err(_) => false,
282        }
283    }
284}
285
286impl std::fmt::Debug for WebhookAdapter {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        f.debug_struct("WebhookAdapter")
289            .field("url", &self.config.url)
290            .field("has_secret", &self.config.secret.is_some())
291            .finish_non_exhaustive()
292    }
293}