1use std::sync::Arc;
33use std::time::Duration;
34
35use base64::Engine;
36use nexo_broker::{AnyBroker, BrokerHandle, Message};
37use serde::{Deserialize, Serialize};
38use serde_json::json;
39
40const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct PluginHttpResponse {
45 pub status: u16,
48 #[serde(default)]
52 pub headers: Vec<(String, String)>,
53 #[serde(default)]
55 pub body_base64: String,
56}
57
58impl PluginHttpResponse {
59 pub fn decoded_body(&self) -> Vec<u8> {
62 if self.body_base64.is_empty() {
63 return Vec::new();
64 }
65 base64::engine::general_purpose::STANDARD
66 .decode(self.body_base64.as_bytes())
67 .unwrap_or_default()
68 }
69
70 pub fn header(&self, name: &str) -> Option<&str> {
73 self.headers
74 .iter()
75 .find(|(k, _)| k.eq_ignore_ascii_case(name))
76 .map(|(_, v)| v.as_str())
77 }
78}
79
80#[derive(Debug, Clone)]
83struct Route {
84 mount_prefix: String,
85 plugin_id: String,
86 timeout: Duration,
87}
88
89pub const RESERVED_PREFIXES: &[&str] = &[
100 "/health",
101 "/metrics",
102 "/pair",
103 "/admin",
104 "/.well-known",
105];
106
107#[derive(Debug, Clone, Default)]
112pub struct PluginHttpRouter {
113 routes: Vec<Route>,
114}
115
116impl PluginHttpRouter {
117 pub fn new() -> Self {
118 Self::default()
119 }
120
121 pub fn register(
132 &mut self,
133 plugin_id: &str,
134 mount_prefix: &str,
135 timeout: Option<Duration>,
136 ) -> Result<(), RouteRegistrationError> {
137 for reserved in RESERVED_PREFIXES {
138 if mount_prefix == *reserved
139 || mount_prefix.starts_with(&format!("{reserved}/"))
140 {
141 return Err(RouteRegistrationError::Reserved {
142 requested: mount_prefix.to_string(),
143 reserved: (*reserved).to_string(),
144 });
145 }
146 }
147 self.routes.retain(|r| r.plugin_id != plugin_id);
150 self.routes.push(Route {
151 mount_prefix: mount_prefix.to_string(),
152 plugin_id: plugin_id.to_string(),
153 timeout: timeout.unwrap_or(DEFAULT_TIMEOUT),
154 });
155 self.sort();
156 Ok(())
157 }
158
159 fn sort(&mut self) {
163 self.routes.sort_by(|a, b| {
164 b.mount_prefix
165 .len()
166 .cmp(&a.mount_prefix.len())
167 .then_with(|| a.plugin_id.cmp(&b.plugin_id))
168 });
169 }
170
171 pub fn match_path(&self, path: &str) -> Option<(&str, Duration)> {
175 self.routes
176 .iter()
177 .find(|r| path.starts_with(&r.mount_prefix))
178 .map(|r| (r.plugin_id.as_str(), r.timeout))
179 }
180
181 pub fn is_empty(&self) -> bool {
182 self.routes.is_empty()
183 }
184}
185
186pub async fn forward_request(
197 broker: &AnyBroker,
198 plugin_id: &str,
199 method: &str,
200 path: &str,
201 query: &str,
202 headers: &[(String, String)],
203 body: &[u8],
204 timeout: Duration,
205) -> Result<PluginHttpResponse, PluginHttpForwardError> {
206 let topic = format!("plugin.{plugin_id}.http.request");
207 let body_b64 = base64::engine::general_purpose::STANDARD.encode(body);
208 let payload = json!({
209 "method": method,
210 "path": path,
211 "query": query,
212 "headers": headers,
213 "body_base64": body_b64,
214 });
215 let msg = Message::new(topic.clone(), payload);
216 let reply = broker
217 .request(&topic, msg, timeout)
218 .await
219 .map_err(|e| PluginHttpForwardError::Broker(e.to_string()))?;
220 serde_json::from_value::<PluginHttpResponse>(reply.payload).map_err(|e| {
221 PluginHttpForwardError::ParseReply(format!(
222 "plugin {plugin_id} returned malformed http reply: {e}"
223 ))
224 })
225}
226
227#[derive(Debug, thiserror::Error)]
231pub enum RouteRegistrationError {
232 #[error("mount_prefix `{requested}` collides with daemon-reserved prefix `{reserved}`")]
233 Reserved {
234 requested: String,
235 reserved: String,
236 },
237}
238
239#[derive(Debug, thiserror::Error)]
244pub enum PluginHttpForwardError {
245 #[error("broker error: {0}")]
246 Broker(String),
247 #[error("plugin reply parse error: {0}")]
248 ParseReply(String),
249}
250
251pub fn build_router_from_handles(
254 handles: &std::collections::BTreeMap<String, Arc<dyn DynPluginManifest>>,
255) -> PluginHttpRouter {
256 let mut router = PluginHttpRouter::new();
257 for (plugin_id, handle) in handles.iter() {
258 if let Some(section) = handle.http_section() {
259 let _ = router.register(plugin_id, §ion.mount_prefix, section.timeout);
260 }
261 }
262 router
263}
264
265pub trait DynPluginManifest: Send + Sync {
276 fn http_section(&self) -> Option<HttpSectionView>;
277}
278
279#[derive(Debug, Clone)]
285pub struct HttpSectionView {
286 pub mount_prefix: String,
287 pub timeout: Option<Duration>,
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 #[test]
295 fn match_path_uses_longest_prefix_first() {
296 let mut r = PluginHttpRouter::new();
297 r.register("plugin_short", "/api", None).unwrap();
298 r.register("plugin_long", "/api/v1", None).unwrap();
299 let (id, _) = r.match_path("/api/v1/users").expect("matches");
300 assert_eq!(id, "plugin_long");
301 }
302
303 #[test]
304 fn match_path_falls_back_to_shorter_prefix() {
305 let mut r = PluginHttpRouter::new();
306 r.register("plugin_short", "/api", None).unwrap();
307 r.register("plugin_long", "/api/v1", None).unwrap();
308 let (id, _) = r.match_path("/api/v2/users").expect("matches");
309 assert_eq!(id, "plugin_short");
310 }
311
312 #[test]
313 fn match_path_returns_none_for_no_match() {
314 let mut r = PluginHttpRouter::new();
315 r.register("plugin", "/whatsapp", None).unwrap();
316 assert!(r.match_path("/foo").is_none());
317 assert!(r.match_path("/").is_none());
318 }
319
320 #[test]
321 fn register_replaces_existing_entry_for_same_plugin() {
322 let mut r = PluginHttpRouter::new();
323 r.register("plugin", "/old", None).unwrap();
324 r.register("plugin", "/new", None).unwrap();
325 assert!(r.match_path("/old").is_none());
326 assert!(r.match_path("/new").is_some());
327 }
328
329 #[test]
330 fn register_applies_custom_timeout() {
331 let mut r = PluginHttpRouter::new();
332 r.register("plugin", "/slow", Some(Duration::from_secs(120))).unwrap();
333 let (_, t) = r.match_path("/slow/foo").unwrap();
334 assert_eq!(t, Duration::from_secs(120));
335 }
336
337 #[test]
338 fn register_default_timeout_when_none() {
339 let mut r = PluginHttpRouter::new();
340 r.register("plugin", "/fast", None).unwrap();
341 let (_, t) = r.match_path("/fast/foo").unwrap();
342 assert_eq!(t, DEFAULT_TIMEOUT);
343 }
344
345 #[test]
346 fn register_rejects_reserved_prefixes() {
347 let mut r = PluginHttpRouter::new();
348 for reserved in RESERVED_PREFIXES {
349 let result = r.register("evil_plugin", reserved, None);
350 assert!(
351 matches!(result, Err(RouteRegistrationError::Reserved { .. })),
352 "expected reservation rejection for `{reserved}`, got {result:?}",
353 );
354 }
355 }
356
357 #[test]
358 fn register_rejects_subpath_of_reserved() {
359 let mut r = PluginHttpRouter::new();
360 let result = r.register("evil_plugin", "/health/foo", None);
363 assert!(matches!(result, Err(RouteRegistrationError::Reserved { .. })));
364 }
365
366 #[test]
367 fn register_accepts_prefixes_that_only_share_substring_with_reserved() {
368 let mut r = PluginHttpRouter::new();
369 assert!(r.register("plugin", "/healthy", None).is_ok());
372 assert!(r.register("plugin2", "/metrics-aggregator", None).is_ok());
373 }
374
375 #[test]
376 fn plugin_http_response_decodes_body() {
377 let response = PluginHttpResponse {
378 status: 200,
379 headers: vec![("Content-Type".into(), "text/html".into())],
380 body_base64: base64::engine::general_purpose::STANDARD.encode("<html/>"),
381 };
382 assert_eq!(response.decoded_body(), b"<html/>");
383 assert_eq!(response.header("content-type"), Some("text/html"));
384 assert_eq!(response.header("CONTENT-TYPE"), Some("text/html"));
385 }
386
387 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
388 async fn forward_request_returns_broker_error_when_no_subscriber() {
389 let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
390 let result = forward_request(
391 &broker,
392 "plugin",
393 "GET",
394 "/whatsapp/pair",
395 "",
396 &[],
397 &[],
398 Duration::from_millis(100),
399 )
400 .await;
401 assert!(matches!(result, Err(PluginHttpForwardError::Broker(_))));
402 }
403}