1use std::sync::Arc;
33use std::time::Duration;
34
35use base64::Engine;
36use nexo_broker::{AnyBroker, BrokerHandle, Message};
37use serde::{Deserialize, Serialize};
38use serde_json::json;
39
40const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct PluginHttpResponse {
45 pub status: u16,
48 #[serde(default)]
52 pub headers: Vec<(String, String)>,
53 #[serde(default)]
55 pub body_base64: String,
56}
57
58impl PluginHttpResponse {
59 pub fn decoded_body(&self) -> Vec<u8> {
62 if self.body_base64.is_empty() {
63 return Vec::new();
64 }
65 base64::engine::general_purpose::STANDARD
66 .decode(self.body_base64.as_bytes())
67 .unwrap_or_default()
68 }
69
70 pub fn header(&self, name: &str) -> Option<&str> {
73 self.headers
74 .iter()
75 .find(|(k, _)| k.eq_ignore_ascii_case(name))
76 .map(|(_, v)| v.as_str())
77 }
78}
79
80#[derive(Debug, Clone)]
83struct Route {
84 mount_prefix: String,
85 plugin_id: String,
86 timeout: Duration,
87}
88
89pub const RESERVED_PREFIXES: &[&str] = &["/health", "/metrics", "/pair", "/admin", "/.well-known"];
100
101#[derive(Debug, Clone, Default)]
106pub struct PluginHttpRouter {
107 routes: Vec<Route>,
108}
109
110impl PluginHttpRouter {
111 pub fn new() -> Self {
112 Self::default()
113 }
114
115 pub fn register(
126 &mut self,
127 plugin_id: &str,
128 mount_prefix: &str,
129 timeout: Option<Duration>,
130 ) -> Result<(), RouteRegistrationError> {
131 for reserved in RESERVED_PREFIXES {
132 if mount_prefix == *reserved || mount_prefix.starts_with(&format!("{reserved}/")) {
133 return Err(RouteRegistrationError::Reserved {
134 requested: mount_prefix.to_string(),
135 reserved: (*reserved).to_string(),
136 });
137 }
138 }
139 self.routes.retain(|r| r.plugin_id != plugin_id);
142 self.routes.push(Route {
143 mount_prefix: mount_prefix.to_string(),
144 plugin_id: plugin_id.to_string(),
145 timeout: timeout.unwrap_or(DEFAULT_TIMEOUT),
146 });
147 self.sort();
148 Ok(())
149 }
150
151 fn sort(&mut self) {
155 self.routes.sort_by(|a, b| {
156 b.mount_prefix
157 .len()
158 .cmp(&a.mount_prefix.len())
159 .then_with(|| a.plugin_id.cmp(&b.plugin_id))
160 });
161 }
162
163 pub fn match_path(&self, path: &str) -> Option<(&str, Duration)> {
167 self.routes
168 .iter()
169 .find(|r| path.starts_with(&r.mount_prefix))
170 .map(|r| (r.plugin_id.as_str(), r.timeout))
171 }
172
173 pub fn is_empty(&self) -> bool {
174 self.routes.is_empty()
175 }
176}
177
178pub async fn forward_request(
189 broker: &AnyBroker,
190 plugin_id: &str,
191 method: &str,
192 path: &str,
193 query: &str,
194 headers: &[(String, String)],
195 body: &[u8],
196 timeout: Duration,
197) -> Result<PluginHttpResponse, PluginHttpForwardError> {
198 let topic = format!("plugin.{plugin_id}.http.request");
199 let body_b64 = base64::engine::general_purpose::STANDARD.encode(body);
200 let payload = json!({
201 "method": method,
202 "path": path,
203 "query": query,
204 "headers": headers,
205 "body_base64": body_b64,
206 });
207 let msg = Message::new(topic.clone(), payload);
208 let reply = broker
209 .request(&topic, msg, timeout)
210 .await
211 .map_err(|e| PluginHttpForwardError::Broker(e.to_string()))?;
212 serde_json::from_value::<PluginHttpResponse>(reply.payload).map_err(|e| {
213 PluginHttpForwardError::ParseReply(format!(
214 "plugin {plugin_id} returned malformed http reply: {e}"
215 ))
216 })
217}
218
219#[derive(Debug, thiserror::Error)]
223pub enum RouteRegistrationError {
224 #[error("mount_prefix `{requested}` collides with daemon-reserved prefix `{reserved}`")]
225 Reserved { requested: String, reserved: String },
226}
227
228#[derive(Debug, thiserror::Error)]
233pub enum PluginHttpForwardError {
234 #[error("broker error: {0}")]
235 Broker(String),
236 #[error("plugin reply parse error: {0}")]
237 ParseReply(String),
238}
239
240pub fn build_router_from_handles(
243 handles: &std::collections::BTreeMap<String, Arc<dyn DynPluginManifest>>,
244) -> PluginHttpRouter {
245 let mut router = PluginHttpRouter::new();
246 for (plugin_id, handle) in handles.iter() {
247 if let Some(section) = handle.http_section() {
248 let _ = router.register(plugin_id, §ion.mount_prefix, section.timeout);
249 }
250 }
251 router
252}
253
254pub trait DynPluginManifest: Send + Sync {
265 fn http_section(&self) -> Option<HttpSectionView>;
266}
267
268#[derive(Debug, Clone)]
274pub struct HttpSectionView {
275 pub mount_prefix: String,
276 pub timeout: Option<Duration>,
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[test]
284 fn match_path_uses_longest_prefix_first() {
285 let mut r = PluginHttpRouter::new();
286 r.register("plugin_short", "/api", None).unwrap();
287 r.register("plugin_long", "/api/v1", None).unwrap();
288 let (id, _) = r.match_path("/api/v1/users").expect("matches");
289 assert_eq!(id, "plugin_long");
290 }
291
292 #[test]
293 fn match_path_falls_back_to_shorter_prefix() {
294 let mut r = PluginHttpRouter::new();
295 r.register("plugin_short", "/api", None).unwrap();
296 r.register("plugin_long", "/api/v1", None).unwrap();
297 let (id, _) = r.match_path("/api/v2/users").expect("matches");
298 assert_eq!(id, "plugin_short");
299 }
300
301 #[test]
302 fn match_path_returns_none_for_no_match() {
303 let mut r = PluginHttpRouter::new();
304 r.register("plugin", "/whatsapp", None).unwrap();
305 assert!(r.match_path("/foo").is_none());
306 assert!(r.match_path("/").is_none());
307 }
308
309 #[test]
310 fn register_replaces_existing_entry_for_same_plugin() {
311 let mut r = PluginHttpRouter::new();
312 r.register("plugin", "/old", None).unwrap();
313 r.register("plugin", "/new", None).unwrap();
314 assert!(r.match_path("/old").is_none());
315 assert!(r.match_path("/new").is_some());
316 }
317
318 #[test]
319 fn register_applies_custom_timeout() {
320 let mut r = PluginHttpRouter::new();
321 r.register("plugin", "/slow", Some(Duration::from_secs(120)))
322 .unwrap();
323 let (_, t) = r.match_path("/slow/foo").unwrap();
324 assert_eq!(t, Duration::from_secs(120));
325 }
326
327 #[test]
328 fn register_default_timeout_when_none() {
329 let mut r = PluginHttpRouter::new();
330 r.register("plugin", "/fast", None).unwrap();
331 let (_, t) = r.match_path("/fast/foo").unwrap();
332 assert_eq!(t, DEFAULT_TIMEOUT);
333 }
334
335 #[test]
336 fn register_rejects_reserved_prefixes() {
337 let mut r = PluginHttpRouter::new();
338 for reserved in RESERVED_PREFIXES {
339 let result = r.register("evil_plugin", reserved, None);
340 assert!(
341 matches!(result, Err(RouteRegistrationError::Reserved { .. })),
342 "expected reservation rejection for `{reserved}`, got {result:?}",
343 );
344 }
345 }
346
347 #[test]
348 fn register_rejects_subpath_of_reserved() {
349 let mut r = PluginHttpRouter::new();
350 let result = r.register("evil_plugin", "/health/foo", None);
353 assert!(matches!(
354 result,
355 Err(RouteRegistrationError::Reserved { .. })
356 ));
357 }
358
359 #[test]
360 fn register_accepts_prefixes_that_only_share_substring_with_reserved() {
361 let mut r = PluginHttpRouter::new();
362 assert!(r.register("plugin", "/healthy", None).is_ok());
365 assert!(r.register("plugin2", "/metrics-aggregator", None).is_ok());
366 }
367
368 #[test]
369 fn plugin_http_response_decodes_body() {
370 let response = PluginHttpResponse {
371 status: 200,
372 headers: vec![("Content-Type".into(), "text/html".into())],
373 body_base64: base64::engine::general_purpose::STANDARD.encode("<html/>"),
374 };
375 assert_eq!(response.decoded_body(), b"<html/>");
376 assert_eq!(response.header("content-type"), Some("text/html"));
377 assert_eq!(response.header("CONTENT-TYPE"), Some("text/html"));
378 }
379
380 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
381 async fn forward_request_returns_broker_error_when_no_subscriber() {
382 let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
383 let result = forward_request(
384 &broker,
385 "plugin",
386 "GET",
387 "/whatsapp/pair",
388 "",
389 &[],
390 &[],
391 Duration::from_millis(100),
392 )
393 .await;
394 assert!(matches!(result, Err(PluginHttpForwardError::Broker(_))));
395 }
396}