fraiseql_core/runtime/subscription/
webhook.rs1use serde::Serialize;
2
3use super::{SubscriptionError, transport::TransportAdapter, types::SubscriptionEvent};
4
5#[derive(Debug, Clone)]
7pub struct WebhookConfig {
8 pub url: String,
10
11 pub secret: Option<String>,
13
14 pub timeout_ms: u64,
16
17 pub max_retries: u32,
19
20 pub retry_delay_ms: u64,
22
23 pub headers: std::collections::HashMap<String, String>,
25}
26
27impl WebhookConfig {
28 #[must_use]
30 pub fn new(url: impl Into<String>) -> Self {
31 Self {
32 url: url.into(),
33 secret: None,
34 timeout_ms: 30_000,
35 max_retries: 3,
36 retry_delay_ms: 1000,
37 headers: std::collections::HashMap::new(),
38 }
39 }
40
41 #[must_use]
43 pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
44 self.secret = Some(secret.into());
45 self
46 }
47
48 #[must_use]
50 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
51 self.timeout_ms = timeout_ms;
52 self
53 }
54
55 #[must_use]
57 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
58 self.max_retries = max_retries;
59 self
60 }
61
62 #[must_use]
64 pub fn with_retry_delay(mut self, delay_ms: u64) -> Self {
65 self.retry_delay_ms = delay_ms;
66 self
67 }
68
69 #[must_use]
71 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
72 self.headers.insert(name.into(), value.into());
73 self
74 }
75}
76
77#[derive(Debug, Clone, Serialize)]
79pub struct WebhookPayload {
80 pub event_id: String,
82
83 pub subscription_name: String,
85
86 pub entity_type: String,
88
89 pub entity_id: String,
91
92 pub operation: String,
94
95 pub data: serde_json::Value,
97
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub old_data: Option<serde_json::Value>,
101
102 pub timestamp: String,
104
105 pub sequence_number: u64,
107}
108
109impl WebhookPayload {
110 #[must_use]
112 pub fn from_event(event: &SubscriptionEvent, subscription_name: &str) -> Self {
113 Self {
114 event_id: event.event_id.clone(),
115 subscription_name: subscription_name.to_string(),
116 entity_type: event.entity_type.clone(),
117 entity_id: event.entity_id.clone(),
118 operation: format!("{:?}", event.operation),
119 data: event.data.clone(),
120 old_data: event.old_data.clone(),
121 timestamp: event.timestamp.to_rfc3339(),
122 sequence_number: event.sequence_number,
123 }
124 }
125}
126
127pub struct WebhookAdapter {
147 config: WebhookConfig,
148 client: reqwest::Client,
149}
150
151impl WebhookAdapter {
152 #[must_use]
158 pub fn new(config: WebhookConfig) -> Self {
159 let client = reqwest::Client::builder()
160 .timeout(std::time::Duration::from_millis(config.timeout_ms))
161 .build()
162 .expect("Failed to build HTTP client");
163
164 Self { config, client }
165 }
166
167 fn compute_signature(&self, payload: &str) -> Option<String> {
169 use hmac::{Hmac, Mac};
170 use sha2::Sha256;
171
172 let secret = self.config.secret.as_ref()?;
173
174 let mut mac =
175 Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("HMAC can take any size key");
176 mac.update(payload.as_bytes());
177
178 let result = mac.finalize();
179 Some(hex::encode(result.into_bytes()))
180 }
181}
182
183#[async_trait::async_trait]
184impl TransportAdapter for WebhookAdapter {
185 async fn deliver(
186 &self,
187 event: &SubscriptionEvent,
188 subscription_name: &str,
189 ) -> Result<(), SubscriptionError> {
190 let payload = WebhookPayload::from_event(event, subscription_name);
191 let payload_json = serde_json::to_string(&payload).map_err(|e| {
192 SubscriptionError::Internal(format!("Failed to serialize payload: {e}"))
193 })?;
194
195 let mut attempt = 0;
196 let mut delay = self.config.retry_delay_ms;
197
198 loop {
199 attempt += 1;
200
201 let mut request = self
202 .client
203 .post(&self.config.url)
204 .header("Content-Type", "application/json")
205 .header("X-FraiseQL-Event-Id", &event.event_id)
206 .header("X-FraiseQL-Event-Type", subscription_name);
207
208 if let Some(signature) = self.compute_signature(&payload_json) {
210 request = request.header("X-FraiseQL-Signature", format!("sha256={signature}"));
211 }
212
213 for (name, value) in &self.config.headers {
215 request = request.header(name, value);
216 }
217
218 let result = request.body(payload_json.clone()).send().await;
219
220 match result {
221 Ok(response) if response.status().is_success() => {
222 tracing::debug!(
223 url = %self.config.url,
224 event_id = %event.event_id,
225 attempt = attempt,
226 "Webhook delivered successfully"
227 );
228 return Ok(());
229 },
230 Ok(response) => {
231 let status = response.status();
232 tracing::warn!(
233 url = %self.config.url,
234 event_id = %event.event_id,
235 status = %status,
236 attempt = attempt,
237 "Webhook delivery failed with status"
238 );
239
240 if status.is_client_error() && status.as_u16() != 429 {
242 return Err(SubscriptionError::Internal(format!(
243 "Webhook delivery failed: {status}"
244 )));
245 }
246 },
247 Err(e) => {
248 tracing::warn!(
249 url = %self.config.url,
250 event_id = %event.event_id,
251 error = %e,
252 attempt = attempt,
253 "Webhook delivery error"
254 );
255 },
256 }
257
258 if attempt >= self.config.max_retries {
260 return Err(SubscriptionError::Internal(format!(
261 "Webhook delivery failed after {} attempts",
262 attempt
263 )));
264 }
265
266 tracing::debug!(delay_ms = delay, "Retrying webhook delivery");
268 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
269 delay *= 2;
270 }
271 }
272
273 fn name(&self) -> &'static str {
274 "webhook"
275 }
276
277 async fn health_check(&self) -> bool {
278 match self.client.head(&self.config.url).send().await {
280 Ok(response) => response.status().is_success() || response.status().as_u16() == 405,
281 Err(_) => false,
282 }
283 }
284}
285
286impl std::fmt::Debug for WebhookAdapter {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 f.debug_struct("WebhookAdapter")
289 .field("url", &self.config.url)
290 .field("has_secret", &self.config.secret.is_some())
291 .finish_non_exhaustive()
292 }
293}