Skip to main content

pylon_plugin/builtin/
webhooks.rs

1use std::sync::Mutex;
2
3use crate::Plugin;
4use pylon_auth::AuthContext;
5use serde_json::Value;
6
7use super::net_guard::is_private_ip;
8
9/// How the webhook plugin handles delivery.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DeliveryMode {
12    /// Log webhook events only (no HTTP request).
13    Log,
14    /// Deliver via HTTP POST (no local log).
15    Deliver,
16    /// Log and deliver.
17    Both,
18}
19
20/// A webhook registration.
21#[derive(Clone)]
22pub struct WebhookConfig {
23    /// URL to POST to.
24    pub url: String,
25    /// Entity to watch. None = all entities.
26    pub entity: Option<String>,
27    /// Events to fire on. Empty = all events.
28    pub events: Vec<String>,
29    /// Optional secret for HMAC signing.
30    pub secret: Option<String>,
31}
32
33/// Webhooks plugin. Fires HTTP POST callbacks on entity changes.
34pub struct WebhooksPlugin {
35    hooks: Vec<WebhookConfig>,
36    log: Mutex<Vec<WebhookEvent>>,
37    delivery_log: Mutex<Vec<DeliveryAttempt>>,
38    max_log: usize,
39    mode: DeliveryMode,
40}
41
42#[derive(Debug, Clone)]
43pub struct WebhookEvent {
44    pub url: String,
45    pub entity: String,
46    pub event: String,
47    pub row_id: String,
48    pub status: String,
49}
50
51/// A single delivery attempt record.
52#[derive(Debug, Clone)]
53pub struct DeliveryAttempt {
54    pub url: String,
55    pub status: u16,
56    pub success: bool,
57    pub timestamp: String,
58    pub error: Option<String>,
59}
60
61fn now() -> String {
62    use std::time::{SystemTime, UNIX_EPOCH};
63    let ts = SystemTime::now()
64        .duration_since(UNIX_EPOCH)
65        .unwrap_or_default();
66    format!("{}.{:03}", ts.as_secs(), ts.subsec_millis())
67}
68
69/// Actually deliver a webhook via HTTP POST.
70fn deliver(url: &str, payload: &str) -> Result<u16, String> {
71    use std::io::{Read, Write};
72    use std::net::TcpStream;
73    use std::time::Duration;
74
75    // Parse URL: http://host:port/path
76    let url = url
77        .strip_prefix("http://")
78        .ok_or("Only http:// URLs supported")?;
79    let (host_port, path) = match url.find('/') {
80        Some(i) => (&url[..i], &url[i..]),
81        None => (url, "/"),
82    };
83
84    // SSRF protection: block connections to private/reserved IP ranges.
85    if is_private_ip(host_port) {
86        return Err("Connection to private/reserved IP addresses is not allowed".into());
87    }
88
89    let mut stream =
90        TcpStream::connect(host_port).map_err(|e| format!("Connection failed: {e}"))?;
91    stream.set_write_timeout(Some(Duration::from_secs(10))).ok();
92    stream.set_read_timeout(Some(Duration::from_secs(10))).ok();
93
94    let request = format!(
95        "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
96        path, host_port, payload.len(), payload
97    );
98
99    stream
100        .write_all(request.as_bytes())
101        .map_err(|e| format!("Write failed: {e}"))?;
102
103    let mut response = String::new();
104    stream.read_to_string(&mut response).ok();
105
106    // Parse status code from response.
107    let status: u16 = response
108        .lines()
109        .next()
110        .and_then(|line| line.split_whitespace().nth(1))
111        .and_then(|s| s.parse().ok())
112        .unwrap_or(0);
113
114    Ok(status)
115}
116
117impl WebhooksPlugin {
118    pub fn new() -> Self {
119        Self {
120            hooks: Vec::new(),
121            log: Mutex::new(Vec::new()),
122            delivery_log: Mutex::new(Vec::new()),
123            max_log: 100,
124            mode: DeliveryMode::Log,
125        }
126    }
127
128    /// Create a plugin with the specified delivery mode.
129    pub fn with_mode(mode: DeliveryMode) -> Self {
130        Self {
131            hooks: Vec::new(),
132            log: Mutex::new(Vec::new()),
133            delivery_log: Mutex::new(Vec::new()),
134            max_log: 100,
135            mode,
136        }
137    }
138
139    pub fn add(&mut self, config: WebhookConfig) {
140        self.hooks.push(config);
141    }
142
143    pub fn log(&self) -> Vec<WebhookEvent> {
144        self.log.lock().unwrap().clone()
145    }
146
147    /// Return all delivery attempts.
148    pub fn delivery_history(&self) -> Vec<DeliveryAttempt> {
149        self.delivery_log.lock().unwrap().clone()
150    }
151
152    fn fire(&self, entity: &str, event: &str, row_id: &str, data: Option<&Value>) {
153        for hook in &self.hooks {
154            let entity_match = hook.entity.as_deref().map(|e| e == entity).unwrap_or(true);
155            let event_match = hook.events.is_empty() || hook.events.iter().any(|e| e == event);
156
157            if entity_match && event_match {
158                let payload = serde_json::json!({
159                    "event": event,
160                    "entity": entity,
161                    "row_id": row_id,
162                    "data": data,
163                });
164
165                let should_log = matches!(self.mode, DeliveryMode::Log | DeliveryMode::Both);
166                let should_deliver =
167                    matches!(self.mode, DeliveryMode::Deliver | DeliveryMode::Both);
168
169                // Build the log status from either a real delivery or the old stub.
170                let status = if should_deliver {
171                    let url = hook.url.clone();
172                    let payload_str = payload.to_string();
173                    let timestamp = now();
174
175                    // Deliver in a separate thread so we don't block the caller.
176                    let result = {
177                        let url_clone = url.clone();
178                        let payload_clone = payload_str.clone();
179                        std::thread::spawn(move || deliver(&url_clone, &payload_clone))
180                            .join()
181                            .unwrap_or_else(|_| Err("Thread panicked".into()))
182                    };
183
184                    let attempt = match &result {
185                        Ok(code) => DeliveryAttempt {
186                            url: url.clone(),
187                            status: *code,
188                            success: (200..300).contains(code),
189                            timestamp,
190                            error: None,
191                        },
192                        Err(e) => DeliveryAttempt {
193                            url: url.clone(),
194                            status: 0,
195                            success: false,
196                            timestamp,
197                            error: Some(e.clone()),
198                        },
199                    };
200
201                    let mut dlog = self.delivery_log.lock().unwrap();
202                    dlog.push(attempt);
203                    let excess = dlog.len().saturating_sub(self.max_log);
204                    if excess > 0 {
205                        dlog.drain(0..excess);
206                    }
207
208                    match result {
209                        Ok(code) => format!("{code}"),
210                        Err(e) => format!("error: {e}"),
211                    }
212                } else {
213                    // Log-only mode: no real HTTP call.
214                    "200".to_string()
215                };
216
217                if should_log {
218                    let mut log = self.log.lock().unwrap();
219                    log.push(WebhookEvent {
220                        url: hook.url.clone(),
221                        entity: entity.to_string(),
222                        event: event.to_string(),
223                        row_id: row_id.to_string(),
224                        status,
225                    });
226                    let excess = log.len().saturating_sub(self.max_log);
227                    if excess > 0 {
228                        log.drain(0..excess);
229                    }
230                }
231            }
232        }
233    }
234}
235
236impl Plugin for WebhooksPlugin {
237    fn name(&self) -> &str {
238        "webhooks"
239    }
240
241    fn after_insert(&self, entity: &str, id: &str, data: &Value, _auth: &AuthContext) {
242        self.fire(entity, "insert", id, Some(data));
243    }
244
245    fn after_update(&self, entity: &str, id: &str, data: &Value, _auth: &AuthContext) {
246        self.fire(entity, "update", id, Some(data));
247    }
248
249    fn after_delete(&self, entity: &str, id: &str, _auth: &AuthContext) {
250        self.fire(entity, "delete", id, None);
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[test]
259    fn fires_on_insert() {
260        let mut plugin = WebhooksPlugin::new();
261        plugin.add(WebhookConfig {
262            url: "https://example.com/webhook".into(),
263            entity: None,
264            events: vec![],
265            secret: None,
266        });
267
268        plugin.after_insert(
269            "Todo",
270            "t1",
271            &serde_json::json!({"title": "Test"}),
272            &AuthContext::anonymous(),
273        );
274        assert_eq!(plugin.log().len(), 1);
275        assert_eq!(plugin.log()[0].event, "insert");
276        assert_eq!(plugin.log()[0].entity, "Todo");
277    }
278
279    #[test]
280    fn filters_by_entity() {
281        let mut plugin = WebhooksPlugin::new();
282        plugin.add(WebhookConfig {
283            url: "https://example.com/webhook".into(),
284            entity: Some("Todo".into()),
285            events: vec![],
286            secret: None,
287        });
288
289        plugin.after_insert(
290            "User",
291            "u1",
292            &serde_json::json!({}),
293            &AuthContext::anonymous(),
294        );
295        assert_eq!(plugin.log().len(), 0); // User doesn't match
296
297        plugin.after_insert(
298            "Todo",
299            "t1",
300            &serde_json::json!({}),
301            &AuthContext::anonymous(),
302        );
303        assert_eq!(plugin.log().len(), 1);
304    }
305
306    #[test]
307    fn filters_by_event() {
308        let mut plugin = WebhooksPlugin::new();
309        plugin.add(WebhookConfig {
310            url: "https://example.com/webhook".into(),
311            entity: None,
312            events: vec!["delete".into()],
313            secret: None,
314        });
315
316        plugin.after_insert(
317            "Todo",
318            "t1",
319            &serde_json::json!({}),
320            &AuthContext::anonymous(),
321        );
322        assert_eq!(plugin.log().len(), 0); // insert doesn't match
323
324        plugin.after_delete("Todo", "t1", &AuthContext::anonymous());
325        assert_eq!(plugin.log().len(), 1);
326    }
327
328    #[test]
329    fn trims_log() {
330        let mut plugin = WebhooksPlugin::new();
331        plugin.max_log = 2;
332        plugin.add(WebhookConfig {
333            url: "x".into(),
334            entity: None,
335            events: vec![],
336            secret: None,
337        });
338
339        let auth = AuthContext::anonymous();
340        plugin.after_insert("A", "1", &serde_json::json!({}), &auth);
341        plugin.after_insert("A", "2", &serde_json::json!({}), &auth);
342        plugin.after_insert("A", "3", &serde_json::json!({}), &auth);
343
344        assert_eq!(plugin.log().len(), 2);
345    }
346
347    // --- Delivery mode tests ---
348
349    #[test]
350    fn delivery_mode_enum_values() {
351        assert_ne!(DeliveryMode::Log, DeliveryMode::Deliver);
352        assert_ne!(DeliveryMode::Deliver, DeliveryMode::Both);
353        assert_eq!(DeliveryMode::Log, DeliveryMode::Log);
354    }
355
356    #[test]
357    fn with_mode_sets_mode() {
358        let plugin = WebhooksPlugin::with_mode(DeliveryMode::Deliver);
359        assert_eq!(plugin.mode, DeliveryMode::Deliver);
360    }
361
362    #[test]
363    fn log_mode_does_not_populate_delivery_history() {
364        let mut plugin = WebhooksPlugin::new(); // defaults to Log
365        plugin.add(WebhookConfig {
366            url: "http://localhost:9999/hook".into(),
367            entity: None,
368            events: vec![],
369            secret: None,
370        });
371
372        plugin.after_insert(
373            "Todo",
374            "t1",
375            &serde_json::json!({}),
376            &AuthContext::anonymous(),
377        );
378        assert_eq!(plugin.delivery_history().len(), 0);
379        assert_eq!(plugin.log().len(), 1);
380    }
381
382    #[test]
383    fn deliver_mode_blocks_private_ip() {
384        let mut plugin = WebhooksPlugin::with_mode(DeliveryMode::Deliver);
385        plugin.add(WebhookConfig {
386            url: "http://127.0.0.1:19999/hook".into(),
387            entity: None,
388            events: vec![],
389            secret: None,
390        });
391
392        plugin.after_insert(
393            "Todo",
394            "t1",
395            &serde_json::json!({}),
396            &AuthContext::anonymous(),
397        );
398
399        let history = plugin.delivery_history();
400        assert_eq!(history.len(), 1);
401        assert!(!history[0].success);
402        assert!(history[0]
403            .error
404            .as_ref()
405            .unwrap()
406            .contains("private/reserved"));
407    }
408
409    #[test]
410    fn both_mode_populates_log_and_delivery_history() {
411        let mut plugin = WebhooksPlugin::with_mode(DeliveryMode::Both);
412        plugin.add(WebhookConfig {
413            url: "http://127.0.0.1:19999/hook".into(),
414            entity: None,
415            events: vec![],
416            secret: None,
417        });
418
419        plugin.after_insert(
420            "Todo",
421            "t1",
422            &serde_json::json!({}),
423            &AuthContext::anonymous(),
424        );
425
426        assert_eq!(plugin.delivery_history().len(), 1);
427        assert_eq!(plugin.log().len(), 1);
428    }
429
430    #[test]
431    fn delivery_attempt_tracks_url() {
432        let mut plugin = WebhooksPlugin::with_mode(DeliveryMode::Deliver);
433        plugin.add(WebhookConfig {
434            url: "http://127.0.0.1:19999/my-hook".into(),
435            entity: None,
436            events: vec![],
437            secret: None,
438        });
439
440        plugin.after_insert(
441            "Todo",
442            "t1",
443            &serde_json::json!({}),
444            &AuthContext::anonymous(),
445        );
446
447        let history = plugin.delivery_history();
448        assert_eq!(history[0].url, "http://127.0.0.1:19999/my-hook");
449    }
450
451    #[test]
452    fn delivery_history_trimmed_to_max_log() {
453        let mut plugin = WebhooksPlugin::with_mode(DeliveryMode::Deliver);
454        plugin.max_log = 2;
455        plugin.add(WebhookConfig {
456            url: "http://127.0.0.1:19999/hook".into(),
457            entity: None,
458            events: vec![],
459            secret: None,
460        });
461
462        let auth = AuthContext::anonymous();
463        plugin.after_insert("A", "1", &serde_json::json!({}), &auth);
464        plugin.after_insert("A", "2", &serde_json::json!({}), &auth);
465        plugin.after_insert("A", "3", &serde_json::json!({}), &auth);
466
467        assert_eq!(plugin.delivery_history().len(), 2);
468    }
469
470    // --- URL parsing tests ---
471
472    #[test]
473    fn deliver_rejects_non_http() {
474        let result = deliver("https://example.com/path", "{}");
475        assert!(result.is_err());
476        assert!(result.unwrap_err().contains("Only http://"));
477    }
478
479    #[test]
480    fn deliver_blocks_private_ip_addresses() {
481        // 127.0.0.1 (loopback)
482        let result = deliver("http://127.0.0.1:19999/webhook/path", "{}");
483        assert!(result.is_err());
484        assert!(result.unwrap_err().contains("private/reserved"));
485
486        // 10.x.x.x
487        let result = deliver("http://10.0.0.1:80/hook", "{}");
488        assert!(result.is_err());
489        assert!(result.unwrap_err().contains("private/reserved"));
490
491        // 172.16.x.x
492        let result = deliver("http://172.16.0.1:80/hook", "{}");
493        assert!(result.is_err());
494        assert!(result.unwrap_err().contains("private/reserved"));
495
496        // 192.168.x.x
497        let result = deliver("http://192.168.1.1:80/hook", "{}");
498        assert!(result.is_err());
499        assert!(result.unwrap_err().contains("private/reserved"));
500
501        // 169.254.x.x (AWS metadata)
502        let result = deliver("http://169.254.169.254/latest/meta-data/", "{}");
503        assert!(result.is_err());
504        assert!(result.unwrap_err().contains("private/reserved"));
505
506        // localhost
507        let result = deliver("http://localhost:9999/hook", "{}");
508        assert!(result.is_err());
509        assert!(result.unwrap_err().contains("private/reserved"));
510    }
511
512    #[test]
513    fn deliver_parses_url_without_path() {
514        // Public IP -- will fail to connect but passes the SSRF check.
515        let result = deliver("http://203.0.113.1:19999", "{}");
516        assert!(result.is_err());
517        assert!(result.unwrap_err().contains("Connection failed"));
518    }
519}