1use std::net::IpAddr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::future::join_all;
6
7use freshblu_core::{
8 device::Device,
9 forwarder::{ForwarderEntry, ForwarderEvent, MeshbluForwarder, WebhookForwarder},
10 message::{DeviceEvent, Message},
11 token::GenerateTokenOptions,
12};
13use freshblu_store::DynStore;
14use serde_json::Value;
15use tracing::{debug, warn};
16use url::Url;
17use uuid::Uuid;
18
19use crate::bus::DynBus;
20use crate::metrics::{WEBHOOKS_FAILED, WEBHOOKS_SENT};
21
22const MAX_FORWARD_DEPTH: usize = 5;
23const MAX_FORWARDERS_PER_EVENT: usize = 10;
24
25pub struct WebhookExecutor {
26 client: reqwest::Client,
27 store: DynStore,
28 bus: DynBus,
29 allow_localhost: bool,
30}
31
32impl WebhookExecutor {
33 pub fn new(store: DynStore, bus: DynBus) -> Self {
34 let client = reqwest::Client::builder()
35 .timeout(Duration::from_secs(10))
36 .build()
37 .expect("reqwest client");
38 Self {
39 client,
40 store,
41 bus,
42 allow_localhost: false,
43 }
44 }
45
46 #[cfg(test)]
48 pub fn new_with_localhost(store: DynStore, bus: DynBus) -> Self {
49 let mut this = Self::new(store, bus);
50 this.allow_localhost = true;
51 this
52 }
53
54 pub fn set_allow_localhost(&mut self, allow: bool) {
56 self.allow_localhost = allow;
57 }
58
59 pub async fn execute(
61 self: &Arc<Self>,
62 device: &Device,
63 event: ForwarderEvent,
64 payload: &Value,
65 forwarded_from: &[Uuid],
66 ) {
67 let forwarders = match &device.meshblu.forwarders {
68 Some(f) => f.get(event),
69 None => return,
70 };
71
72 if forwarders.is_empty() {
73 return;
74 }
75
76 let mut webhook_futures = Vec::new();
79 let mut meshblu_entries = Vec::new();
80
81 for entry in forwarders.iter().take(MAX_FORWARDERS_PER_EVENT) {
82 match entry {
83 ForwarderEntry::Webhook(wh) => {
84 webhook_futures.push(self.fire_webhook(device, wh, payload));
85 }
86 ForwarderEntry::Meshblu(mf) => {
87 meshblu_entries.push(mf);
88 }
89 }
90 }
91
92 join_all(webhook_futures).await;
94
95 for mf in meshblu_entries {
97 self.fire_meshblu(device, mf, payload, forwarded_from).await;
98 }
99 }
100
101 async fn fire_webhook(&self, device: &Device, wh: &WebhookForwarder, payload: &Value) {
102 if !is_safe_url(&wh.url, self.allow_localhost) {
103 warn!("Webhook URL rejected (SSRF protection): {}", wh.url);
104 WEBHOOKS_FAILED.inc();
105 return;
106 }
107
108 let mut req = match wh.method.to_uppercase().as_str() {
109 "GET" => self.client.get(&wh.url),
110 "PUT" => self.client.put(&wh.url),
111 "DELETE" => self.client.delete(&wh.url),
112 _ => self.client.post(&wh.url),
113 };
114
115 req = req
116 .header("X-Meshblu-Uuid", device.uuid.to_string())
117 .header("Content-Type", "application/json")
118 .json(payload);
119
120 if wh.generate_and_forward_meshblu_credentials {
121 let opts = GenerateTokenOptions {
122 expires_on: Some(chrono::Utc::now().timestamp() + 300), tag: Some("webhook-credential".to_string()),
124 };
125 if let Ok((_, plaintext)) = self.store.generate_token(&device.uuid, opts).await {
126 let cred = format!("{}:{}", device.uuid, plaintext);
127 let encoded = base64::Engine::encode(
128 &base64::engine::general_purpose::STANDARD,
129 cred.as_bytes(),
130 );
131 req = req.header("Authorization", format!("Bearer {}", encoded));
132 }
133 }
134
135 match req.send().await {
136 Ok(resp) => {
137 WEBHOOKS_SENT.inc();
138 debug!(
139 "Webhook to {} returned {}",
140 wh.url,
141 resp.status().as_u16()
142 );
143 }
144 Err(e) => {
145 WEBHOOKS_FAILED.inc();
146 warn!("Webhook to {} failed: {}", wh.url, e);
147 }
148 }
149 }
150
151 async fn fire_meshblu(
152 &self,
153 device: &Device,
154 _mf: &MeshbluForwarder,
155 payload: &Value,
156 forwarded_from: &[Uuid],
157 ) {
158 if forwarded_from.len() >= MAX_FORWARD_DEPTH {
160 warn!(
161 "Meshblu forwarder loop depth exceeded for device {}",
162 device.uuid
163 );
164 return;
165 }
166 if forwarded_from.contains(&device.uuid) {
167 warn!(
168 "Meshblu forwarder circular loop detected for device {}",
169 device.uuid
170 );
171 return;
172 }
173
174 let msg = Message {
176 devices: vec![device.uuid.to_string()],
177 from_uuid: Some(device.uuid),
178 topic: Some("forwarder".to_string()),
179 payload: Some(payload.clone()),
180 metadata: None,
181 extra: Default::default(),
182 };
183
184 let event = DeviceEvent::Message(msg);
185 let _ = self.bus.publish(&device.uuid, event).await;
186 }
187}
188
189fn is_safe_url(url_str: &str, allow_localhost: bool) -> bool {
191 let url = match Url::parse(url_str) {
192 Ok(u) => u,
193 Err(_) => return false,
194 };
195
196 match url.scheme() {
198 "http" | "https" => {}
199 _ => return false,
200 }
201
202 let host = match url.host_str() {
203 Some(h) => h,
204 None => return false,
205 };
206
207 if allow_localhost
209 && (host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]")
210 {
211 return true;
212 }
213
214 if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" || host == "0.0.0.0" {
216 return false;
217 }
218
219 if host == "169.254.169.254" || host == "metadata.google.internal" {
221 return false;
222 }
223
224 if let Ok(ip) = host.parse::<IpAddr>() {
226 return !is_private_ip(ip);
227 }
228
229 if host.ends_with(".internal") || host.ends_with(".local") || host.ends_with(".localhost") {
231 return false;
232 }
233
234 true
235}
236
237fn is_private_ip(ip: IpAddr) -> bool {
238 match ip {
239 IpAddr::V4(v4) => {
240 v4.is_loopback()
241 || v4.is_private()
242 || v4.is_link_local()
243 || v4.is_broadcast()
244 || v4.is_unspecified()
245 || v4.octets()[0] == 169 && v4.octets()[1] == 254 }
247 IpAddr::V6(v6) => {
248 v6.is_loopback() || v6.is_unspecified()
249 }
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[test]
258 fn safe_url_allows_public() {
259 assert!(is_safe_url("https://example.com/hook", false));
260 assert!(is_safe_url("http://api.external.io/webhook", false));
261 }
262
263 #[test]
264 fn safe_url_blocks_localhost() {
265 assert!(!is_safe_url("http://localhost/hook", false));
266 assert!(!is_safe_url("http://127.0.0.1/hook", false));
267 assert!(!is_safe_url("http://[::1]/hook", false));
268 assert!(!is_safe_url("http://0.0.0.0/hook", false));
269 }
270
271 #[test]
272 fn safe_url_allows_localhost_when_enabled() {
273 assert!(is_safe_url("http://127.0.0.1/hook", true));
274 assert!(is_safe_url("http://localhost/hook", true));
275 }
276
277 #[test]
278 fn safe_url_blocks_private_ips() {
279 assert!(!is_safe_url("http://10.0.0.1/hook", false));
280 assert!(!is_safe_url("http://192.168.1.1/hook", false));
281 assert!(!is_safe_url("http://172.16.0.1/hook", false));
282 }
283
284 #[test]
285 fn safe_url_blocks_metadata() {
286 assert!(!is_safe_url("http://169.254.169.254/latest/meta-data/", false));
287 assert!(!is_safe_url("http://metadata.google.internal/computeMetadata/", false));
288 }
289
290 #[test]
291 fn safe_url_blocks_non_http() {
292 assert!(!is_safe_url("file:///etc/passwd", false));
293 assert!(!is_safe_url("ftp://example.com/file", false));
294 assert!(!is_safe_url("gopher://evil.com", false));
295 }
296
297 #[test]
298 fn safe_url_blocks_internal_tlds() {
299 assert!(!is_safe_url("http://service.internal/hook", false));
300 assert!(!is_safe_url("http://host.local/hook", false));
301 assert!(!is_safe_url("http://app.localhost/hook", false));
302 }
303}