1use std::sync::Mutex;
2
3use crate::Plugin;
4use pylon_auth::AuthContext;
5use serde_json::Value;
6
7use super::net_guard::is_private_ip;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DeliveryMode {
12 Log,
14 Deliver,
16 Both,
18}
19
20#[derive(Clone)]
22pub struct WebhookConfig {
23 pub url: String,
25 pub entity: Option<String>,
27 pub events: Vec<String>,
29 pub secret: Option<String>,
31}
32
33pub struct WebhooksPlugin {
35 hooks: Vec<WebhookConfig>,
36 log: Mutex<Vec<WebhookEvent>>,
37 delivery_log: Mutex<Vec<DeliveryAttempt>>,
38 max_log: usize,
39 mode: DeliveryMode,
40}
41
42#[derive(Debug, Clone)]
43pub struct WebhookEvent {
44 pub url: String,
45 pub entity: String,
46 pub event: String,
47 pub row_id: String,
48 pub status: String,
49}
50
51#[derive(Debug, Clone)]
53pub struct DeliveryAttempt {
54 pub url: String,
55 pub status: u16,
56 pub success: bool,
57 pub timestamp: String,
58 pub error: Option<String>,
59}
60
61fn now() -> String {
62 use std::time::{SystemTime, UNIX_EPOCH};
63 let ts = SystemTime::now()
64 .duration_since(UNIX_EPOCH)
65 .unwrap_or_default();
66 format!("{}.{:03}", ts.as_secs(), ts.subsec_millis())
67}
68
69fn deliver(url: &str, payload: &str) -> Result<u16, String> {
71 use std::io::{Read, Write};
72 use std::net::TcpStream;
73 use std::time::Duration;
74
75 let url = url
77 .strip_prefix("http://")
78 .ok_or("Only http:// URLs supported")?;
79 let (host_port, path) = match url.find('/') {
80 Some(i) => (&url[..i], &url[i..]),
81 None => (url, "/"),
82 };
83
84 if is_private_ip(host_port) {
86 return Err("Connection to private/reserved IP addresses is not allowed".into());
87 }
88
89 let mut stream =
90 TcpStream::connect(host_port).map_err(|e| format!("Connection failed: {e}"))?;
91 stream.set_write_timeout(Some(Duration::from_secs(10))).ok();
92 stream.set_read_timeout(Some(Duration::from_secs(10))).ok();
93
94 let request = format!(
95 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
96 path, host_port, payload.len(), payload
97 );
98
99 stream
100 .write_all(request.as_bytes())
101 .map_err(|e| format!("Write failed: {e}"))?;
102
103 let mut response = String::new();
104 stream.read_to_string(&mut response).ok();
105
106 let status: u16 = response
108 .lines()
109 .next()
110 .and_then(|line| line.split_whitespace().nth(1))
111 .and_then(|s| s.parse().ok())
112 .unwrap_or(0);
113
114 Ok(status)
115}
116
117impl WebhooksPlugin {
118 pub fn new() -> Self {
119 Self {
120 hooks: Vec::new(),
121 log: Mutex::new(Vec::new()),
122 delivery_log: Mutex::new(Vec::new()),
123 max_log: 100,
124 mode: DeliveryMode::Log,
125 }
126 }
127
128 pub fn with_mode(mode: DeliveryMode) -> Self {
130 Self {
131 hooks: Vec::new(),
132 log: Mutex::new(Vec::new()),
133 delivery_log: Mutex::new(Vec::new()),
134 max_log: 100,
135 mode,
136 }
137 }
138
139 pub fn add(&mut self, config: WebhookConfig) {
140 self.hooks.push(config);
141 }
142
143 pub fn log(&self) -> Vec<WebhookEvent> {
144 self.log.lock().unwrap().clone()
145 }
146
147 pub fn delivery_history(&self) -> Vec<DeliveryAttempt> {
149 self.delivery_log.lock().unwrap().clone()
150 }
151
152 fn fire(&self, entity: &str, event: &str, row_id: &str, data: Option<&Value>) {
153 for hook in &self.hooks {
154 let entity_match = hook.entity.as_deref().map(|e| e == entity).unwrap_or(true);
155 let event_match = hook.events.is_empty() || hook.events.iter().any(|e| e == event);
156
157 if entity_match && event_match {
158 let payload = serde_json::json!({
159 "event": event,
160 "entity": entity,
161 "row_id": row_id,
162 "data": data,
163 });
164
165 let should_log = matches!(self.mode, DeliveryMode::Log | DeliveryMode::Both);
166 let should_deliver =
167 matches!(self.mode, DeliveryMode::Deliver | DeliveryMode::Both);
168
169 let status = if should_deliver {
171 let url = hook.url.clone();
172 let payload_str = payload.to_string();
173 let timestamp = now();
174
175 let result = {
177 let url_clone = url.clone();
178 let payload_clone = payload_str.clone();
179 std::thread::spawn(move || deliver(&url_clone, &payload_clone))
180 .join()
181 .unwrap_or_else(|_| Err("Thread panicked".into()))
182 };
183
184 let attempt = match &result {
185 Ok(code) => DeliveryAttempt {
186 url: url.clone(),
187 status: *code,
188 success: (200..300).contains(code),
189 timestamp,
190 error: None,
191 },
192 Err(e) => DeliveryAttempt {
193 url: url.clone(),
194 status: 0,
195 success: false,
196 timestamp,
197 error: Some(e.clone()),
198 },
199 };
200
201 let mut dlog = self.delivery_log.lock().unwrap();
202 dlog.push(attempt);
203 let excess = dlog.len().saturating_sub(self.max_log);
204 if excess > 0 {
205 dlog.drain(0..excess);
206 }
207
208 match result {
209 Ok(code) => format!("{code}"),
210 Err(e) => format!("error: {e}"),
211 }
212 } else {
213 "200".to_string()
215 };
216
217 if should_log {
218 let mut log = self.log.lock().unwrap();
219 log.push(WebhookEvent {
220 url: hook.url.clone(),
221 entity: entity.to_string(),
222 event: event.to_string(),
223 row_id: row_id.to_string(),
224 status,
225 });
226 let excess = log.len().saturating_sub(self.max_log);
227 if excess > 0 {
228 log.drain(0..excess);
229 }
230 }
231 }
232 }
233 }
234}
235
236impl Plugin for WebhooksPlugin {
237 fn name(&self) -> &str {
238 "webhooks"
239 }
240
241 fn after_insert(&self, entity: &str, id: &str, data: &Value, _auth: &AuthContext) {
242 self.fire(entity, "insert", id, Some(data));
243 }
244
245 fn after_update(&self, entity: &str, id: &str, data: &Value, _auth: &AuthContext) {
246 self.fire(entity, "update", id, Some(data));
247 }
248
249 fn after_delete(&self, entity: &str, id: &str, _auth: &AuthContext) {
250 self.fire(entity, "delete", id, None);
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[test]
259 fn fires_on_insert() {
260 let mut plugin = WebhooksPlugin::new();
261 plugin.add(WebhookConfig {
262 url: "https://example.com/webhook".into(),
263 entity: None,
264 events: vec![],
265 secret: None,
266 });
267
268 plugin.after_insert(
269 "Todo",
270 "t1",
271 &serde_json::json!({"title": "Test"}),
272 &AuthContext::anonymous(),
273 );
274 assert_eq!(plugin.log().len(), 1);
275 assert_eq!(plugin.log()[0].event, "insert");
276 assert_eq!(plugin.log()[0].entity, "Todo");
277 }
278
279 #[test]
280 fn filters_by_entity() {
281 let mut plugin = WebhooksPlugin::new();
282 plugin.add(WebhookConfig {
283 url: "https://example.com/webhook".into(),
284 entity: Some("Todo".into()),
285 events: vec![],
286 secret: None,
287 });
288
289 plugin.after_insert(
290 "User",
291 "u1",
292 &serde_json::json!({}),
293 &AuthContext::anonymous(),
294 );
295 assert_eq!(plugin.log().len(), 0); plugin.after_insert(
298 "Todo",
299 "t1",
300 &serde_json::json!({}),
301 &AuthContext::anonymous(),
302 );
303 assert_eq!(plugin.log().len(), 1);
304 }
305
306 #[test]
307 fn filters_by_event() {
308 let mut plugin = WebhooksPlugin::new();
309 plugin.add(WebhookConfig {
310 url: "https://example.com/webhook".into(),
311 entity: None,
312 events: vec!["delete".into()],
313 secret: None,
314 });
315
316 plugin.after_insert(
317 "Todo",
318 "t1",
319 &serde_json::json!({}),
320 &AuthContext::anonymous(),
321 );
322 assert_eq!(plugin.log().len(), 0); plugin.after_delete("Todo", "t1", &AuthContext::anonymous());
325 assert_eq!(plugin.log().len(), 1);
326 }
327
328 #[test]
329 fn trims_log() {
330 let mut plugin = WebhooksPlugin::new();
331 plugin.max_log = 2;
332 plugin.add(WebhookConfig {
333 url: "x".into(),
334 entity: None,
335 events: vec![],
336 secret: None,
337 });
338
339 let auth = AuthContext::anonymous();
340 plugin.after_insert("A", "1", &serde_json::json!({}), &auth);
341 plugin.after_insert("A", "2", &serde_json::json!({}), &auth);
342 plugin.after_insert("A", "3", &serde_json::json!({}), &auth);
343
344 assert_eq!(plugin.log().len(), 2);
345 }
346
347 #[test]
350 fn delivery_mode_enum_values() {
351 assert_ne!(DeliveryMode::Log, DeliveryMode::Deliver);
352 assert_ne!(DeliveryMode::Deliver, DeliveryMode::Both);
353 assert_eq!(DeliveryMode::Log, DeliveryMode::Log);
354 }
355
356 #[test]
357 fn with_mode_sets_mode() {
358 let plugin = WebhooksPlugin::with_mode(DeliveryMode::Deliver);
359 assert_eq!(plugin.mode, DeliveryMode::Deliver);
360 }
361
362 #[test]
363 fn log_mode_does_not_populate_delivery_history() {
364 let mut plugin = WebhooksPlugin::new(); plugin.add(WebhookConfig {
366 url: "http://localhost:9999/hook".into(),
367 entity: None,
368 events: vec![],
369 secret: None,
370 });
371
372 plugin.after_insert(
373 "Todo",
374 "t1",
375 &serde_json::json!({}),
376 &AuthContext::anonymous(),
377 );
378 assert_eq!(plugin.delivery_history().len(), 0);
379 assert_eq!(plugin.log().len(), 1);
380 }
381
382 #[test]
383 fn deliver_mode_blocks_private_ip() {
384 let mut plugin = WebhooksPlugin::with_mode(DeliveryMode::Deliver);
385 plugin.add(WebhookConfig {
386 url: "http://127.0.0.1:19999/hook".into(),
387 entity: None,
388 events: vec![],
389 secret: None,
390 });
391
392 plugin.after_insert(
393 "Todo",
394 "t1",
395 &serde_json::json!({}),
396 &AuthContext::anonymous(),
397 );
398
399 let history = plugin.delivery_history();
400 assert_eq!(history.len(), 1);
401 assert!(!history[0].success);
402 assert!(history[0]
403 .error
404 .as_ref()
405 .unwrap()
406 .contains("private/reserved"));
407 }
408
409 #[test]
410 fn both_mode_populates_log_and_delivery_history() {
411 let mut plugin = WebhooksPlugin::with_mode(DeliveryMode::Both);
412 plugin.add(WebhookConfig {
413 url: "http://127.0.0.1:19999/hook".into(),
414 entity: None,
415 events: vec![],
416 secret: None,
417 });
418
419 plugin.after_insert(
420 "Todo",
421 "t1",
422 &serde_json::json!({}),
423 &AuthContext::anonymous(),
424 );
425
426 assert_eq!(plugin.delivery_history().len(), 1);
427 assert_eq!(plugin.log().len(), 1);
428 }
429
430 #[test]
431 fn delivery_attempt_tracks_url() {
432 let mut plugin = WebhooksPlugin::with_mode(DeliveryMode::Deliver);
433 plugin.add(WebhookConfig {
434 url: "http://127.0.0.1:19999/my-hook".into(),
435 entity: None,
436 events: vec![],
437 secret: None,
438 });
439
440 plugin.after_insert(
441 "Todo",
442 "t1",
443 &serde_json::json!({}),
444 &AuthContext::anonymous(),
445 );
446
447 let history = plugin.delivery_history();
448 assert_eq!(history[0].url, "http://127.0.0.1:19999/my-hook");
449 }
450
451 #[test]
452 fn delivery_history_trimmed_to_max_log() {
453 let mut plugin = WebhooksPlugin::with_mode(DeliveryMode::Deliver);
454 plugin.max_log = 2;
455 plugin.add(WebhookConfig {
456 url: "http://127.0.0.1:19999/hook".into(),
457 entity: None,
458 events: vec![],
459 secret: None,
460 });
461
462 let auth = AuthContext::anonymous();
463 plugin.after_insert("A", "1", &serde_json::json!({}), &auth);
464 plugin.after_insert("A", "2", &serde_json::json!({}), &auth);
465 plugin.after_insert("A", "3", &serde_json::json!({}), &auth);
466
467 assert_eq!(plugin.delivery_history().len(), 2);
468 }
469
470 #[test]
473 fn deliver_rejects_non_http() {
474 let result = deliver("https://example.com/path", "{}");
475 assert!(result.is_err());
476 assert!(result.unwrap_err().contains("Only http://"));
477 }
478
479 #[test]
480 fn deliver_blocks_private_ip_addresses() {
481 let result = deliver("http://127.0.0.1:19999/webhook/path", "{}");
483 assert!(result.is_err());
484 assert!(result.unwrap_err().contains("private/reserved"));
485
486 let result = deliver("http://10.0.0.1:80/hook", "{}");
488 assert!(result.is_err());
489 assert!(result.unwrap_err().contains("private/reserved"));
490
491 let result = deliver("http://172.16.0.1:80/hook", "{}");
493 assert!(result.is_err());
494 assert!(result.unwrap_err().contains("private/reserved"));
495
496 let result = deliver("http://192.168.1.1:80/hook", "{}");
498 assert!(result.is_err());
499 assert!(result.unwrap_err().contains("private/reserved"));
500
501 let result = deliver("http://169.254.169.254/latest/meta-data/", "{}");
503 assert!(result.is_err());
504 assert!(result.unwrap_err().contains("private/reserved"));
505
506 let result = deliver("http://localhost:9999/hook", "{}");
508 assert!(result.is_err());
509 assert!(result.unwrap_err().contains("private/reserved"));
510 }
511
512 #[test]
513 fn deliver_parses_url_without_path() {
514 let result = deliver("http://203.0.113.1:19999", "{}");
516 assert!(result.is_err());
517 assert!(result.unwrap_err().contains("Connection failed"));
518 }
519}