Skip to main content

fraiseql_server/subscriptions/
webhook_lifecycle.rs

1//! Webhook-based subscription lifecycle hooks.
2//!
3//! Sends HTTP POST requests to configured URLs on subscription lifecycle events.
4//! `on_connect` and `on_subscribe` are fail-closed (timeout → reject).
5//! `on_disconnect` and `on_unsubscribe` are fire-and-forget.
6
7use std::time::Duration;
8
9use async_trait::async_trait;
10use tracing::{error, warn};
11
12use super::lifecycle::SubscriptionLifecycle;
13
14/// Maximum byte size accepted from a webhook response body.
15///
16/// Webhook responses are used only as rejection error messages, so 64 `KiB` is
17/// more than sufficient for any human-readable reason string.  Capping here
18/// prevents a misbehaving or malicious webhook server from sending a multi-GB
19/// body that exhausts server memory.
20const MAX_WEBHOOK_RESPONSE_BYTES: usize = 64 * 1024; // 64 KiB
21
22/// Subscription lifecycle hooks that call external HTTP endpoints.
23pub struct WebhookLifecycle {
24    client:             reqwest::Client,
25    on_connect_url:     Option<String>,
26    on_disconnect_url:  Option<String>,
27    on_subscribe_url:   Option<String>,
28    on_unsubscribe_url: Option<String>,
29    #[allow(dead_code)] // Reason: kept for future use in fail-closed unsubscribe logic.
30    timeout: Duration,
31}
32
33impl WebhookLifecycle {
34    /// Create a new webhook lifecycle from configured URLs.
35    ///
36    /// `timeout_ms` controls the maximum time to wait for `on_connect` and
37    /// `on_subscribe` responses. `on_disconnect` and `on_unsubscribe` are
38    /// fire-and-forget (timeout is irrelevant for those hooks).
39    #[must_use]
40    pub fn new(
41        on_connect_url: Option<String>,
42        on_disconnect_url: Option<String>,
43        on_subscribe_url: Option<String>,
44        on_unsubscribe_url: Option<String>,
45        timeout_ms: u64,
46    ) -> Self {
47        let timeout = Duration::from_millis(timeout_ms);
48        // Redirects are disabled to prevent redirect-chain SSRF attacks.
49        // `https_only` is intentionally not set here because webhook URLs
50        // may legitimately use plain HTTP in development/staging environments.
51        let client = reqwest::Client::builder()
52            .redirect(reqwest::redirect::Policy::none())
53            .timeout(timeout)
54            .build()
55            .unwrap_or_else(|e| {
56                warn!(
57                    error = %e,
58                    "Failed to build reqwest client with timeout; using default client. \
59                     Webhook lifecycle hooks may not respect the configured timeout."
60                );
61                reqwest::Client::default()
62            });
63        Self {
64            client,
65            on_connect_url,
66            on_disconnect_url,
67            on_subscribe_url,
68            on_unsubscribe_url,
69            timeout,
70        }
71    }
72
73    /// Build from typed subscriptions configuration.
74    ///
75    /// Returns `None` if no hooks are configured.
76    #[must_use]
77    pub fn from_config(config: &fraiseql_core::schema::SubscriptionsConfig) -> Option<Self> {
78        let hooks = config.hooks.as_ref()?;
79        if hooks.on_connect.is_none()
80            && hooks.on_disconnect.is_none()
81            && hooks.on_subscribe.is_none()
82            && hooks.on_unsubscribe.is_none()
83        {
84            return None;
85        }
86        Some(Self::new(
87            hooks.on_connect.clone(),
88            hooks.on_disconnect.clone(),
89            hooks.on_subscribe.clone(),
90            hooks.on_unsubscribe.clone(),
91            hooks.timeout_ms,
92        ))
93    }
94
95    /// Build from compiled schema JSON (`subscriptions.hooks` section).
96    ///
97    /// Returns `None` if no hooks are configured.
98    #[must_use]
99    pub fn from_schema_json(subscriptions: &serde_json::Value) -> Option<Self> {
100        let hooks = subscriptions.get("hooks")?;
101        let on_connect = hooks.get("on_connect").and_then(|v| v.as_str()).map(String::from);
102        let on_disconnect = hooks.get("on_disconnect").and_then(|v| v.as_str()).map(String::from);
103        let on_subscribe = hooks.get("on_subscribe").and_then(|v| v.as_str()).map(String::from);
104        let on_unsubscribe = hooks.get("on_unsubscribe").and_then(|v| v.as_str()).map(String::from);
105
106        // If no hooks are configured, return None.
107        if on_connect.is_none()
108            && on_disconnect.is_none()
109            && on_subscribe.is_none()
110            && on_unsubscribe.is_none()
111        {
112            return None;
113        }
114
115        let timeout_ms = hooks.get("timeout_ms").and_then(|v| v.as_u64()).unwrap_or(500);
116
117        Some(Self::new(on_connect, on_disconnect, on_subscribe, on_unsubscribe, timeout_ms))
118    }
119}
120
121// Reason: SubscriptionLifecycle is defined with #[async_trait]; all implementations must match
122// its transformed method signatures to satisfy the trait contract
123// async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
124#[async_trait]
125impl SubscriptionLifecycle for WebhookLifecycle {
126    /// # Errors
127    ///
128    /// Returns an error string if the HTTP POST to the `on_connect` webhook URL fails
129    /// or returns a non-2xx status code.
130    async fn on_connect(
131        &self,
132        params: &serde_json::Value,
133        connection_id: &str,
134    ) -> Result<(), String> {
135        let Some(ref url) = self.on_connect_url else {
136            return Ok(());
137        };
138
139        let body = serde_json::json!({
140            "event": "connect",
141            "connection_id": connection_id,
142            "params": params,
143        });
144
145        match self.client.post(url).json(&body).send().await {
146            Ok(resp) if resp.status().is_success() => Ok(()),
147            Ok(resp) => {
148                let status = resp.status();
149                let raw = resp
150                    .bytes()
151                    .await
152                    .inspect_err(|e| warn!(url = %url, error = %e, "Failed to read on_connect webhook response body"))
153                    .unwrap_or_default();
154                let capped = &raw[..raw.len().min(MAX_WEBHOOK_RESPONSE_BYTES)];
155                let text = String::from_utf8_lossy(capped).into_owned();
156                warn!(
157                    url = %url,
158                    status = %status,
159                    "on_connect webhook rejected connection"
160                );
161                Err(text)
162            },
163            Err(e) => {
164                error!(url = %url, error = %e, "on_connect webhook failed");
165                Err(format!("webhook timeout or error: {e}"))
166            },
167        }
168    }
169
170    async fn on_disconnect(&self, connection_id: &str) {
171        let Some(ref url) = self.on_disconnect_url else {
172            return;
173        };
174
175        let body = serde_json::json!({
176            "event": "disconnect",
177            "connection_id": connection_id,
178        });
179
180        // Fire-and-forget: spawn a task so we don't block the connection cleanup.
181        let client = self.client.clone();
182        let url = url.clone();
183        tokio::spawn(async move {
184            if let Err(e) = client.post(&url).json(&body).send().await {
185                warn!(url = %url, error = %e, "on_disconnect webhook failed");
186            }
187        });
188    }
189
190    /// # Errors
191    ///
192    /// Returns an error string if the HTTP POST to the `on_subscribe` webhook URL fails
193    /// or returns a non-2xx status code.
194    async fn on_subscribe(
195        &self,
196        subscription_name: &str,
197        variables: &serde_json::Value,
198        connection_id: &str,
199    ) -> Result<(), String> {
200        let Some(ref url) = self.on_subscribe_url else {
201            return Ok(());
202        };
203
204        let body = serde_json::json!({
205            "event": "subscribe",
206            "connection_id": connection_id,
207            "subscription_name": subscription_name,
208            "variables": variables,
209        });
210
211        match self.client.post(url).json(&body).send().await {
212            Ok(resp) if resp.status().is_success() => Ok(()),
213            Ok(resp) => {
214                let status = resp.status();
215                let raw = resp
216                    .bytes()
217                    .await
218                    .inspect_err(|e| warn!(url = %url, error = %e, "Failed to read on_subscribe webhook response body"))
219                    .unwrap_or_default();
220                let capped = &raw[..raw.len().min(MAX_WEBHOOK_RESPONSE_BYTES)];
221                let text = String::from_utf8_lossy(capped).into_owned();
222                warn!(
223                    url = %url,
224                    status = %status,
225                    "on_subscribe webhook rejected subscription"
226                );
227                Err(text)
228            },
229            Err(e) => {
230                error!(url = %url, error = %e, "on_subscribe webhook failed");
231                Err(format!("webhook timeout or error: {e}"))
232            },
233        }
234    }
235
236    async fn on_unsubscribe(&self, subscription_id: &str, connection_id: &str) {
237        let Some(ref url) = self.on_unsubscribe_url else {
238            return;
239        };
240
241        let body = serde_json::json!({
242            "event": "unsubscribe",
243            "connection_id": connection_id,
244            "subscription_id": subscription_id,
245        });
246
247        let client = self.client.clone();
248        let url = url.clone();
249        tokio::spawn(async move {
250            if let Err(e) = client.post(&url).json(&body).send().await {
251                warn!(url = %url, error = %e, "on_unsubscribe webhook failed");
252            }
253        });
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    #![allow(clippy::unwrap_used)] // Reason: test code, panics acceptable
260    #![allow(clippy::cast_precision_loss)] // Reason: test metrics reporting
261    #![allow(clippy::cast_sign_loss)] // Reason: test data uses small positive integers
262    #![allow(clippy::cast_possible_truncation)] // Reason: test data values are bounded
263    #![allow(clippy::cast_possible_wrap)] // Reason: test data values are bounded
264    #![allow(clippy::missing_panics_doc)] // Reason: test helpers
265    #![allow(clippy::missing_errors_doc)] // Reason: test helpers
266    #![allow(missing_docs)] // Reason: test code
267    #![allow(clippy::items_after_statements)] // Reason: test helpers defined near use site
268
269    use super::*;
270
271    #[test]
272    fn from_schema_json_no_hooks() {
273        let json = serde_json::json!({});
274        assert!(WebhookLifecycle::from_schema_json(&json).is_none());
275    }
276
277    #[test]
278    fn from_schema_json_empty_hooks() {
279        let json = serde_json::json!({"hooks": {}});
280        assert!(WebhookLifecycle::from_schema_json(&json).is_none());
281    }
282
283    #[test]
284    fn from_schema_json_with_connect_url() {
285        let json = serde_json::json!({
286            "hooks": {
287                "on_connect": "http://localhost:8001/hooks/ws-connect",
288                "timeout_ms": 300
289            }
290        });
291        let wh = WebhookLifecycle::from_schema_json(&json).unwrap();
292        assert_eq!(wh.on_connect_url, Some("http://localhost:8001/hooks/ws-connect".to_string()));
293        assert!(wh.on_disconnect_url.is_none());
294        assert!(wh.on_subscribe_url.is_none());
295        assert_eq!(wh.timeout, Duration::from_millis(300));
296    }
297
298    #[test]
299    fn from_schema_json_default_timeout() {
300        let json = serde_json::json!({
301            "hooks": {
302                "on_disconnect": "http://localhost:8001/hooks/ws-disconnect"
303            }
304        });
305        let wh = WebhookLifecycle::from_schema_json(&json).unwrap();
306        assert_eq!(wh.timeout, Duration::from_millis(500));
307    }
308
309    #[test]
310    fn webhook_response_cap_constant_is_reasonable() {
311        // 64 KiB: large enough for any human-readable error, small enough to prevent OOM.
312        assert_eq!(MAX_WEBHOOK_RESPONSE_BYTES, 64 * 1024);
313    }
314
315    #[test]
316    fn webhook_response_body_is_capped_at_limit() {
317        // Simulate what on_connect / on_subscribe do: bytes → cap → lossy UTF-8.
318        let oversized: Vec<u8> = vec![b'x'; MAX_WEBHOOK_RESPONSE_BYTES + 100];
319        let capped = &oversized[..oversized.len().min(MAX_WEBHOOK_RESPONSE_BYTES)];
320        let text = String::from_utf8_lossy(capped).into_owned();
321        assert_eq!(text.len(), MAX_WEBHOOK_RESPONSE_BYTES);
322    }
323}