Skip to main content

freshblu_server/
webhook.rs

1use std::net::IpAddr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::future::join_all;
6
7use freshblu_core::{
8    device::Device,
9    forwarder::{ForwarderEntry, ForwarderEvent, MeshbluForwarder, WebhookForwarder},
10    message::{DeviceEvent, Message},
11    token::GenerateTokenOptions,
12};
13use freshblu_store::DynStore;
14use serde_json::Value;
15use tracing::{debug, warn};
16use url::Url;
17use uuid::Uuid;
18
19use crate::bus::DynBus;
20use crate::metrics::{WEBHOOKS_FAILED, WEBHOOKS_SENT};
21
22const MAX_FORWARD_DEPTH: usize = 5;
23const MAX_FORWARDERS_PER_EVENT: usize = 10;
24
25pub struct WebhookExecutor {
26    client: reqwest::Client,
27    store: DynStore,
28    bus: DynBus,
29    allow_localhost: bool,
30}
31
32impl WebhookExecutor {
33    pub fn new(store: DynStore, bus: DynBus) -> Self {
34        let client = reqwest::Client::builder()
35            .timeout(Duration::from_secs(10))
36            .build()
37            .expect("reqwest client");
38        Self {
39            client,
40            store,
41            bus,
42            allow_localhost: false,
43        }
44    }
45
46    /// Create an executor that allows localhost URLs (for testing only).
47    #[cfg(test)]
48    pub fn new_with_localhost(store: DynStore, bus: DynBus) -> Self {
49        let mut this = Self::new(store, bus);
50        this.allow_localhost = true;
51        this
52    }
53
54    /// Allow localhost webhooks (for testing).
55    pub fn set_allow_localhost(&mut self, allow: bool) {
56        self.allow_localhost = allow;
57    }
58
59    /// Fire forwarders for a device event. `forwarded_from` tracks UUIDs to detect loops.
60    pub async fn execute(
61        self: &Arc<Self>,
62        device: &Device,
63        event: ForwarderEvent,
64        payload: &Value,
65        forwarded_from: &[Uuid],
66    ) {
67        let forwarders = match &device.meshblu.forwarders {
68            Some(f) => f.get(event),
69            None => return,
70        };
71
72        if forwarders.is_empty() {
73            return;
74        }
75
76        // Cap forwarders per event to prevent abuse
77        // Fire webhooks concurrently, meshblu forwarders sequentially (they mutate state)
78        let mut webhook_futures = Vec::new();
79        let mut meshblu_entries = Vec::new();
80
81        for entry in forwarders.iter().take(MAX_FORWARDERS_PER_EVENT) {
82            match entry {
83                ForwarderEntry::Webhook(wh) => {
84                    webhook_futures.push(self.fire_webhook(device, wh, payload));
85                }
86                ForwarderEntry::Meshblu(mf) => {
87                    meshblu_entries.push(mf);
88                }
89            }
90        }
91
92        // Fire all webhooks concurrently
93        join_all(webhook_futures).await;
94
95        // Process meshblu forwarders sequentially (loop detection is order-dependent)
96        for mf in meshblu_entries {
97            self.fire_meshblu(device, mf, payload, forwarded_from).await;
98        }
99    }
100
101    async fn fire_webhook(&self, device: &Device, wh: &WebhookForwarder, payload: &Value) {
102        if !is_safe_url(&wh.url, self.allow_localhost) {
103            warn!("Webhook URL rejected (SSRF protection): {}", wh.url);
104            WEBHOOKS_FAILED.inc();
105            return;
106        }
107
108        let mut req = match wh.method.to_uppercase().as_str() {
109            "GET" => self.client.get(&wh.url),
110            "PUT" => self.client.put(&wh.url),
111            "DELETE" => self.client.delete(&wh.url),
112            _ => self.client.post(&wh.url),
113        };
114
115        req = req
116            .header("X-Meshblu-Uuid", device.uuid.to_string())
117            .header("Content-Type", "application/json")
118            .json(payload);
119
120        if wh.generate_and_forward_meshblu_credentials {
121            let opts = GenerateTokenOptions {
122                expires_on: Some(chrono::Utc::now().timestamp() + 300), // 5 min expiry
123                tag: Some("webhook-credential".to_string()),
124            };
125            if let Ok((_, plaintext)) = self.store.generate_token(&device.uuid, opts).await {
126                let cred = format!("{}:{}", device.uuid, plaintext);
127                let encoded = base64::Engine::encode(
128                    &base64::engine::general_purpose::STANDARD,
129                    cred.as_bytes(),
130                );
131                req = req.header("Authorization", format!("Bearer {}", encoded));
132            }
133        }
134
135        match req.send().await {
136            Ok(resp) => {
137                WEBHOOKS_SENT.inc();
138                debug!(
139                    "Webhook to {} returned {}",
140                    wh.url,
141                    resp.status().as_u16()
142                );
143            }
144            Err(e) => {
145                WEBHOOKS_FAILED.inc();
146                warn!("Webhook to {} failed: {}", wh.url, e);
147            }
148        }
149    }
150
151    async fn fire_meshblu(
152        &self,
153        device: &Device,
154        _mf: &MeshbluForwarder,
155        payload: &Value,
156        forwarded_from: &[Uuid],
157    ) {
158        // Loop detection
159        if forwarded_from.len() >= MAX_FORWARD_DEPTH {
160            warn!(
161                "Meshblu forwarder loop depth exceeded for device {}",
162                device.uuid
163            );
164            return;
165        }
166        if forwarded_from.contains(&device.uuid) {
167            warn!(
168                "Meshblu forwarder circular loop detected for device {}",
169                device.uuid
170            );
171            return;
172        }
173
174        // Re-emit as a message from this device to itself
175        let msg = Message {
176            devices: vec![device.uuid.to_string()],
177            from_uuid: Some(device.uuid),
178            topic: Some("forwarder".to_string()),
179            payload: Some(payload.clone()),
180            metadata: None,
181            extra: Default::default(),
182        };
183
184        let event = DeviceEvent::Message(msg);
185        let _ = self.bus.publish(&device.uuid, event).await;
186    }
187}
188
189/// SSRF protection: reject URLs targeting localhost, private IPs, link-local, and metadata endpoints.
190fn is_safe_url(url_str: &str, allow_localhost: bool) -> bool {
191    let url = match Url::parse(url_str) {
192        Ok(u) => u,
193        Err(_) => return false,
194    };
195
196    // Only allow http/https
197    match url.scheme() {
198        "http" | "https" => {}
199        _ => return false,
200    }
201
202    let host = match url.host_str() {
203        Some(h) => h,
204        None => return false,
205    };
206
207    // Allow localhost in test mode
208    if allow_localhost
209        && (host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]")
210    {
211        return true;
212    }
213
214    // Block common localhost names
215    if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" || host == "0.0.0.0" {
216        return false;
217    }
218
219    // Block AWS/GCP/Azure metadata endpoints
220    if host == "169.254.169.254" || host == "metadata.google.internal" {
221        return false;
222    }
223
224    // Parse and check IP addresses for private ranges
225    if let Ok(ip) = host.parse::<IpAddr>() {
226        return !is_private_ip(ip);
227    }
228
229    // Block hosts that look like they resolve to internal services
230    if host.ends_with(".internal") || host.ends_with(".local") || host.ends_with(".localhost") {
231        return false;
232    }
233
234    true
235}
236
237fn is_private_ip(ip: IpAddr) -> bool {
238    match ip {
239        IpAddr::V4(v4) => {
240            v4.is_loopback()
241                || v4.is_private()
242                || v4.is_link_local()
243                || v4.is_broadcast()
244                || v4.is_unspecified()
245                || v4.octets()[0] == 169 && v4.octets()[1] == 254 // link-local
246        }
247        IpAddr::V6(v6) => {
248            v6.is_loopback() || v6.is_unspecified()
249        }
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[test]
258    fn safe_url_allows_public() {
259        assert!(is_safe_url("https://example.com/hook", false));
260        assert!(is_safe_url("http://api.external.io/webhook", false));
261    }
262
263    #[test]
264    fn safe_url_blocks_localhost() {
265        assert!(!is_safe_url("http://localhost/hook", false));
266        assert!(!is_safe_url("http://127.0.0.1/hook", false));
267        assert!(!is_safe_url("http://[::1]/hook", false));
268        assert!(!is_safe_url("http://0.0.0.0/hook", false));
269    }
270
271    #[test]
272    fn safe_url_allows_localhost_when_enabled() {
273        assert!(is_safe_url("http://127.0.0.1/hook", true));
274        assert!(is_safe_url("http://localhost/hook", true));
275    }
276
277    #[test]
278    fn safe_url_blocks_private_ips() {
279        assert!(!is_safe_url("http://10.0.0.1/hook", false));
280        assert!(!is_safe_url("http://192.168.1.1/hook", false));
281        assert!(!is_safe_url("http://172.16.0.1/hook", false));
282    }
283
284    #[test]
285    fn safe_url_blocks_metadata() {
286        assert!(!is_safe_url("http://169.254.169.254/latest/meta-data/", false));
287        assert!(!is_safe_url("http://metadata.google.internal/computeMetadata/", false));
288    }
289
290    #[test]
291    fn safe_url_blocks_non_http() {
292        assert!(!is_safe_url("file:///etc/passwd", false));
293        assert!(!is_safe_url("ftp://example.com/file", false));
294        assert!(!is_safe_url("gopher://evil.com", false));
295    }
296
297    #[test]
298    fn safe_url_blocks_internal_tlds() {
299        assert!(!is_safe_url("http://service.internal/hook", false));
300        assert!(!is_safe_url("http://host.local/hook", false));
301        assert!(!is_safe_url("http://app.localhost/hook", false));
302    }
303}