fraiseql_server/subscriptions/
webhook_lifecycle.rs1use std::time::Duration;
8
9use async_trait::async_trait;
10use tracing::{error, warn};
11
12use super::lifecycle::SubscriptionLifecycle;
13
14const MAX_WEBHOOK_RESPONSE_BYTES: usize = 64 * 1024; pub struct WebhookLifecycle {
24 client: reqwest::Client,
25 on_connect_url: Option<String>,
26 on_disconnect_url: Option<String>,
27 on_subscribe_url: Option<String>,
28 on_unsubscribe_url: Option<String>,
29 #[allow(dead_code)] timeout: Duration,
31}
32
33impl WebhookLifecycle {
34 #[must_use]
40 pub fn new(
41 on_connect_url: Option<String>,
42 on_disconnect_url: Option<String>,
43 on_subscribe_url: Option<String>,
44 on_unsubscribe_url: Option<String>,
45 timeout_ms: u64,
46 ) -> Self {
47 let timeout = Duration::from_millis(timeout_ms);
48 let client = reqwest::Client::builder()
52 .redirect(reqwest::redirect::Policy::none())
53 .timeout(timeout)
54 .build()
55 .unwrap_or_else(|e| {
56 warn!(
57 error = %e,
58 "Failed to build reqwest client with timeout; using default client. \
59 Webhook lifecycle hooks may not respect the configured timeout."
60 );
61 reqwest::Client::default()
62 });
63 Self {
64 client,
65 on_connect_url,
66 on_disconnect_url,
67 on_subscribe_url,
68 on_unsubscribe_url,
69 timeout,
70 }
71 }
72
73 #[must_use]
77 pub fn from_config(config: &fraiseql_core::schema::SubscriptionsConfig) -> Option<Self> {
78 let hooks = config.hooks.as_ref()?;
79 if hooks.on_connect.is_none()
80 && hooks.on_disconnect.is_none()
81 && hooks.on_subscribe.is_none()
82 && hooks.on_unsubscribe.is_none()
83 {
84 return None;
85 }
86 Some(Self::new(
87 hooks.on_connect.clone(),
88 hooks.on_disconnect.clone(),
89 hooks.on_subscribe.clone(),
90 hooks.on_unsubscribe.clone(),
91 hooks.timeout_ms,
92 ))
93 }
94
95 #[must_use]
99 pub fn from_schema_json(subscriptions: &serde_json::Value) -> Option<Self> {
100 let hooks = subscriptions.get("hooks")?;
101 let on_connect = hooks.get("on_connect").and_then(|v| v.as_str()).map(String::from);
102 let on_disconnect = hooks.get("on_disconnect").and_then(|v| v.as_str()).map(String::from);
103 let on_subscribe = hooks.get("on_subscribe").and_then(|v| v.as_str()).map(String::from);
104 let on_unsubscribe = hooks.get("on_unsubscribe").and_then(|v| v.as_str()).map(String::from);
105
106 if on_connect.is_none()
108 && on_disconnect.is_none()
109 && on_subscribe.is_none()
110 && on_unsubscribe.is_none()
111 {
112 return None;
113 }
114
115 let timeout_ms = hooks.get("timeout_ms").and_then(|v| v.as_u64()).unwrap_or(500);
116
117 Some(Self::new(on_connect, on_disconnect, on_subscribe, on_unsubscribe, timeout_ms))
118 }
119}
120
121#[async_trait]
125impl SubscriptionLifecycle for WebhookLifecycle {
126 async fn on_connect(
131 &self,
132 params: &serde_json::Value,
133 connection_id: &str,
134 ) -> Result<(), String> {
135 let Some(ref url) = self.on_connect_url else {
136 return Ok(());
137 };
138
139 let body = serde_json::json!({
140 "event": "connect",
141 "connection_id": connection_id,
142 "params": params,
143 });
144
145 match self.client.post(url).json(&body).send().await {
146 Ok(resp) if resp.status().is_success() => Ok(()),
147 Ok(resp) => {
148 let status = resp.status();
149 let raw = resp
150 .bytes()
151 .await
152 .inspect_err(|e| warn!(url = %url, error = %e, "Failed to read on_connect webhook response body"))
153 .unwrap_or_default();
154 let capped = &raw[..raw.len().min(MAX_WEBHOOK_RESPONSE_BYTES)];
155 let text = String::from_utf8_lossy(capped).into_owned();
156 warn!(
157 url = %url,
158 status = %status,
159 "on_connect webhook rejected connection"
160 );
161 Err(text)
162 },
163 Err(e) => {
164 error!(url = %url, error = %e, "on_connect webhook failed");
165 Err(format!("webhook timeout or error: {e}"))
166 },
167 }
168 }
169
170 async fn on_disconnect(&self, connection_id: &str) {
171 let Some(ref url) = self.on_disconnect_url else {
172 return;
173 };
174
175 let body = serde_json::json!({
176 "event": "disconnect",
177 "connection_id": connection_id,
178 });
179
180 let client = self.client.clone();
182 let url = url.clone();
183 tokio::spawn(async move {
184 if let Err(e) = client.post(&url).json(&body).send().await {
185 warn!(url = %url, error = %e, "on_disconnect webhook failed");
186 }
187 });
188 }
189
190 async fn on_subscribe(
195 &self,
196 subscription_name: &str,
197 variables: &serde_json::Value,
198 connection_id: &str,
199 ) -> Result<(), String> {
200 let Some(ref url) = self.on_subscribe_url else {
201 return Ok(());
202 };
203
204 let body = serde_json::json!({
205 "event": "subscribe",
206 "connection_id": connection_id,
207 "subscription_name": subscription_name,
208 "variables": variables,
209 });
210
211 match self.client.post(url).json(&body).send().await {
212 Ok(resp) if resp.status().is_success() => Ok(()),
213 Ok(resp) => {
214 let status = resp.status();
215 let raw = resp
216 .bytes()
217 .await
218 .inspect_err(|e| warn!(url = %url, error = %e, "Failed to read on_subscribe webhook response body"))
219 .unwrap_or_default();
220 let capped = &raw[..raw.len().min(MAX_WEBHOOK_RESPONSE_BYTES)];
221 let text = String::from_utf8_lossy(capped).into_owned();
222 warn!(
223 url = %url,
224 status = %status,
225 "on_subscribe webhook rejected subscription"
226 );
227 Err(text)
228 },
229 Err(e) => {
230 error!(url = %url, error = %e, "on_subscribe webhook failed");
231 Err(format!("webhook timeout or error: {e}"))
232 },
233 }
234 }
235
236 async fn on_unsubscribe(&self, subscription_id: &str, connection_id: &str) {
237 let Some(ref url) = self.on_unsubscribe_url else {
238 return;
239 };
240
241 let body = serde_json::json!({
242 "event": "unsubscribe",
243 "connection_id": connection_id,
244 "subscription_id": subscription_id,
245 });
246
247 let client = self.client.clone();
248 let url = url.clone();
249 tokio::spawn(async move {
250 if let Err(e) = client.post(&url).json(&body).send().await {
251 warn!(url = %url, error = %e, "on_unsubscribe webhook failed");
252 }
253 });
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 #![allow(clippy::unwrap_used)] #![allow(clippy::cast_precision_loss)] #![allow(clippy::cast_sign_loss)] #![allow(clippy::cast_possible_truncation)] #![allow(clippy::cast_possible_wrap)] #![allow(clippy::missing_panics_doc)] #![allow(clippy::missing_errors_doc)] #![allow(missing_docs)] #![allow(clippy::items_after_statements)] use super::*;
270
271 #[test]
272 fn from_schema_json_no_hooks() {
273 let json = serde_json::json!({});
274 assert!(WebhookLifecycle::from_schema_json(&json).is_none());
275 }
276
277 #[test]
278 fn from_schema_json_empty_hooks() {
279 let json = serde_json::json!({"hooks": {}});
280 assert!(WebhookLifecycle::from_schema_json(&json).is_none());
281 }
282
283 #[test]
284 fn from_schema_json_with_connect_url() {
285 let json = serde_json::json!({
286 "hooks": {
287 "on_connect": "http://localhost:8001/hooks/ws-connect",
288 "timeout_ms": 300
289 }
290 });
291 let wh = WebhookLifecycle::from_schema_json(&json).unwrap();
292 assert_eq!(wh.on_connect_url, Some("http://localhost:8001/hooks/ws-connect".to_string()));
293 assert!(wh.on_disconnect_url.is_none());
294 assert!(wh.on_subscribe_url.is_none());
295 assert_eq!(wh.timeout, Duration::from_millis(300));
296 }
297
298 #[test]
299 fn from_schema_json_default_timeout() {
300 let json = serde_json::json!({
301 "hooks": {
302 "on_disconnect": "http://localhost:8001/hooks/ws-disconnect"
303 }
304 });
305 let wh = WebhookLifecycle::from_schema_json(&json).unwrap();
306 assert_eq!(wh.timeout, Duration::from_millis(500));
307 }
308
309 #[test]
310 fn webhook_response_cap_constant_is_reasonable() {
311 assert_eq!(MAX_WEBHOOK_RESPONSE_BYTES, 64 * 1024);
313 }
314
315 #[test]
316 fn webhook_response_body_is_capped_at_limit() {
317 let oversized: Vec<u8> = vec![b'x'; MAX_WEBHOOK_RESPONSE_BYTES + 100];
319 let capped = &oversized[..oversized.len().min(MAX_WEBHOOK_RESPONSE_BYTES)];
320 let text = String::from_utf8_lossy(capped).into_owned();
321 assert_eq!(text.len(), MAX_WEBHOOK_RESPONSE_BYTES);
322 }
323}