fraiseql_core/runtime/subscription/
webhook.rs1use async_trait::async_trait;
2use serde::Serialize;
3
4use super::{SubscriptionError, transport::TransportAdapter, types::SubscriptionEvent};
5
6#[derive(Debug, Clone)]
8pub struct WebhookTransportConfig {
9 pub url: String,
11
12 pub secret: Option<String>,
14
15 pub timeout_ms: u64,
17
18 pub max_retries: u32,
20
21 pub retry_delay_ms: u64,
23
24 pub headers: std::collections::HashMap<String, String>,
26}
27
28impl WebhookTransportConfig {
29 #[must_use]
31 pub fn new(url: impl Into<String>) -> Self {
32 Self {
33 url: url.into(),
34 secret: None,
35 timeout_ms: 30_000,
36 max_retries: 3,
37 retry_delay_ms: 1000,
38 headers: std::collections::HashMap::new(),
39 }
40 }
41
42 #[must_use]
44 pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
45 self.secret = Some(secret.into());
46 self
47 }
48
49 #[must_use]
51 pub const fn with_timeout(mut self, timeout_ms: u64) -> Self {
52 self.timeout_ms = timeout_ms;
53 self
54 }
55
56 #[must_use]
58 pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
59 self.max_retries = max_retries;
60 self
61 }
62
63 #[must_use]
65 pub const fn with_retry_delay(mut self, delay_ms: u64) -> Self {
66 self.retry_delay_ms = delay_ms;
67 self
68 }
69
70 #[must_use]
72 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
73 self.headers.insert(name.into(), value.into());
74 self
75 }
76}
77
78#[derive(Debug, Clone, Serialize)]
80pub struct WebhookPayload {
81 pub event_id: String,
83
84 pub subscription_name: String,
86
87 pub entity_type: String,
89
90 pub entity_id: String,
92
93 pub operation: String,
95
96 pub data: serde_json::Value,
98
99 #[serde(skip_serializing_if = "Option::is_none")]
101 pub old_data: Option<serde_json::Value>,
102
103 pub timestamp: String,
105
106 pub sequence_number: u64,
108}
109
110impl WebhookPayload {
111 #[must_use]
113 pub fn from_event(event: &SubscriptionEvent, subscription_name: &str) -> Self {
114 Self {
115 event_id: event.event_id.clone(),
116 subscription_name: subscription_name.to_string(),
117 entity_type: event.entity_type.clone(),
118 entity_id: event.entity_id.clone(),
119 operation: format!("{:?}", event.operation),
120 data: event.data.clone(),
121 old_data: event.old_data.clone(),
122 timestamp: event.timestamp.to_rfc3339(),
123 sequence_number: event.sequence_number,
124 }
125 }
126}
127
128pub struct WebhookAdapter {
155 config: WebhookTransportConfig,
156 client: reqwest::Client,
157}
158
159impl WebhookAdapter {
160 pub fn new(config: WebhookTransportConfig) -> Result<Self, SubscriptionError> {
167 validate_webhook_url(&config.url)?;
168
169 let client = reqwest::Client::builder()
170 .timeout(std::time::Duration::from_millis(config.timeout_ms))
171 .build()
172 .map_err(|e| SubscriptionError::Internal(format!("HTTP client init failed: {e}")))?;
173
174 Ok(Self { config, client })
175 }
176
177 fn compute_signature(&self, payload: &str) -> Option<String> {
179 use hmac::{Hmac, Mac};
180 use sha2::Sha256;
181
182 let secret = self.config.secret.as_ref()?;
183
184 #[allow(clippy::expect_used)]
185 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes())
187 .expect("SHA-256 HMAC accepts any key size");
188 mac.update(payload.as_bytes());
189
190 let result = mac.finalize();
191 Some(hex::encode(result.into_bytes()))
192 }
193}
194
195#[async_trait]
199impl TransportAdapter for WebhookAdapter {
200 async fn deliver(
201 &self,
202 event: &SubscriptionEvent,
203 subscription_name: &str,
204 ) -> Result<(), SubscriptionError> {
205 let payload = WebhookPayload::from_event(event, subscription_name);
206 let payload_json = serde_json::to_string(&payload).map_err(|e| {
207 SubscriptionError::Internal(format!("Failed to serialize payload: {e}"))
208 })?;
209
210 let mut attempt = 0;
211 let mut delay = self.config.retry_delay_ms;
212
213 loop {
214 attempt += 1;
215
216 let mut request = self
217 .client
218 .post(&self.config.url)
219 .header("Content-Type", "application/json")
220 .header("X-FraiseQL-Event-Id", &event.event_id)
221 .header("X-FraiseQL-Event-Type", subscription_name);
222
223 if let Some(signature) = self.compute_signature(&payload_json) {
225 request = request.header("X-FraiseQL-Signature", format!("sha256={signature}"));
226 }
227
228 for (name, value) in &self.config.headers {
230 request = request.header(name, value);
231 }
232
233 let result = request.body(payload_json.clone()).send().await;
234
235 match result {
236 Ok(response) if response.status().is_success() => {
237 tracing::debug!(
238 url = %self.config.url,
239 event_id = %event.event_id,
240 attempt = attempt,
241 "Webhook delivered successfully"
242 );
243 return Ok(());
244 },
245 Ok(response) => {
246 let status = response.status();
247 tracing::warn!(
248 url = %self.config.url,
249 event_id = %event.event_id,
250 status = %status,
251 attempt = attempt,
252 "Webhook delivery failed with status"
253 );
254
255 if status.is_client_error() && status.as_u16() != 429 {
257 return Err(SubscriptionError::Internal(format!(
258 "Webhook delivery failed: {status}"
259 )));
260 }
261 },
262 Err(e) => {
263 tracing::warn!(
264 url = %self.config.url,
265 event_id = %event.event_id,
266 error = %e,
267 attempt = attempt,
268 "Webhook delivery error"
269 );
270 },
271 }
272
273 if attempt >= self.config.max_retries {
275 return Err(SubscriptionError::Internal(format!(
276 "Webhook delivery failed after {} attempts",
277 attempt
278 )));
279 }
280
281 tracing::debug!(delay_ms = delay, "Retrying webhook delivery");
283 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
284 delay *= 2;
285 }
286 }
287
288 fn name(&self) -> &'static str {
289 "webhook"
290 }
291
292 async fn health_check(&self) -> bool {
293 match self.client.head(&self.config.url).send().await {
295 Ok(response) => response.status().is_success() || response.status().as_u16() == 405,
296 Err(_) => false,
297 }
298 }
299}
300
301impl std::fmt::Debug for WebhookAdapter {
302 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303 f.debug_struct("WebhookAdapter")
304 .field("url", &self.config.url)
305 .field("has_secret", &self.config.secret.is_some())
306 .finish_non_exhaustive()
307 }
308}
309
310pub fn validate_webhook_url(url: &str) -> Result<(), SubscriptionError> {
320 let parsed = reqwest::Url::parse(url)
321 .map_err(|e| SubscriptionError::Internal(format!("Invalid webhook URL '{url}': {e}")))?;
322
323 let host_raw = parsed
324 .host_str()
325 .ok_or_else(|| SubscriptionError::Internal(format!("Webhook URL has no host: {url}")))?;
326
327 let host = if host_raw.starts_with('[') && host_raw.ends_with(']') {
329 &host_raw[1..host_raw.len() - 1]
330 } else {
331 host_raw
332 };
333
334 let lower_host = host.to_ascii_lowercase();
335 if lower_host == "localhost" || lower_host.ends_with(".localhost") {
336 return Err(SubscriptionError::Internal(format!(
337 "Webhook URL targets a loopback host ({host}) — SSRF protection blocked"
338 )));
339 }
340
341 if let Ok(ip) = host.parse::<std::net::IpAddr>() {
342 if is_webhook_ssrf_blocked_ip(&ip) {
343 return Err(SubscriptionError::Internal(format!(
344 "Webhook URL targets a private/reserved IP ({ip}) — SSRF protection blocked"
345 )));
346 }
347 }
348
349 Ok(())
350}
351
352fn is_webhook_ssrf_blocked_ip(ip: &std::net::IpAddr) -> bool {
354 match ip {
355 std::net::IpAddr::V4(v4) => {
356 let o = v4.octets();
357 o[0] == 127 || o[0] == 10 || (o[0] == 172 && (16..=31).contains(&o[1])) || (o[0] == 192 && o[1] == 168) || (o[0] == 169 && o[1] == 254) || (o[0] == 100 && (64..=127).contains(&o[1])) || o == [0, 0, 0, 0] },
365 std::net::IpAddr::V6(v6) => {
366 v6.is_loopback() || v6.is_unspecified() || {
369 let s = v6.segments();
370 (s[0] & 0xfe00) == 0xfc00 || (s[0] & 0xffc0) == 0xfe80 || (s[0] == 0 && s[1] == 0 && s[2] == 0 && s[3] == 0 && s[4] == 0 && s[5] == 0xffff)
374 }
375 },
376 }
377}