use std::sync::Arc;
use std::time::Duration;
use base64::Engine;
use nexo_broker::{AnyBroker, BrokerHandle, Message};
use serde::{Deserialize, Serialize};
use serde_json::json;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PluginHttpResponse {
pub status: u16,
#[serde(default)]
pub headers: Vec<(String, String)>,
#[serde(default)]
pub body_base64: String,
}
impl PluginHttpResponse {
pub fn decoded_body(&self) -> Vec<u8> {
if self.body_base64.is_empty() {
return Vec::new();
}
base64::engine::general_purpose::STANDARD
.decode(self.body_base64.as_bytes())
.unwrap_or_default()
}
pub fn header(&self, name: &str) -> Option<&str> {
self.headers
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v.as_str())
}
}
#[derive(Debug, Clone)]
struct Route {
mount_prefix: String,
plugin_id: String,
timeout: Duration,
}
pub const RESERVED_PREFIXES: &[&str] = &["/health", "/metrics", "/pair", "/admin", "/.well-known"];
#[derive(Debug, Clone, Default)]
pub struct PluginHttpRouter {
routes: Vec<Route>,
}
impl PluginHttpRouter {
pub fn new() -> Self {
Self::default()
}
pub fn register(
&mut self,
plugin_id: &str,
mount_prefix: &str,
timeout: Option<Duration>,
) -> Result<(), RouteRegistrationError> {
for reserved in RESERVED_PREFIXES {
if mount_prefix == *reserved || mount_prefix.starts_with(&format!("{reserved}/")) {
return Err(RouteRegistrationError::Reserved {
requested: mount_prefix.to_string(),
reserved: (*reserved).to_string(),
});
}
}
self.routes.retain(|r| r.plugin_id != plugin_id);
self.routes.push(Route {
mount_prefix: mount_prefix.to_string(),
plugin_id: plugin_id.to_string(),
timeout: timeout.unwrap_or(DEFAULT_TIMEOUT),
});
self.sort();
Ok(())
}
fn sort(&mut self) {
self.routes.sort_by(|a, b| {
b.mount_prefix
.len()
.cmp(&a.mount_prefix.len())
.then_with(|| a.plugin_id.cmp(&b.plugin_id))
});
}
pub fn match_path(&self, path: &str) -> Option<(&str, Duration)> {
self.routes
.iter()
.find(|r| path.starts_with(&r.mount_prefix))
.map(|r| (r.plugin_id.as_str(), r.timeout))
}
pub fn is_empty(&self) -> bool {
self.routes.is_empty()
}
}
pub async fn forward_request(
broker: &AnyBroker,
plugin_id: &str,
method: &str,
path: &str,
query: &str,
headers: &[(String, String)],
body: &[u8],
timeout: Duration,
) -> Result<PluginHttpResponse, PluginHttpForwardError> {
let topic = format!("plugin.{plugin_id}.http.request");
let body_b64 = base64::engine::general_purpose::STANDARD.encode(body);
let payload = json!({
"method": method,
"path": path,
"query": query,
"headers": headers,
"body_base64": body_b64,
});
let msg = Message::new(topic.clone(), payload);
let reply = broker
.request(&topic, msg, timeout)
.await
.map_err(|e| PluginHttpForwardError::Broker(e.to_string()))?;
serde_json::from_value::<PluginHttpResponse>(reply.payload).map_err(|e| {
PluginHttpForwardError::ParseReply(format!(
"plugin {plugin_id} returned malformed http reply: {e}"
))
})
}
#[derive(Debug, thiserror::Error)]
pub enum RouteRegistrationError {
#[error("mount_prefix `{requested}` collides with daemon-reserved prefix `{reserved}`")]
Reserved { requested: String, reserved: String },
}
#[derive(Debug, thiserror::Error)]
pub enum PluginHttpForwardError {
#[error("broker error: {0}")]
Broker(String),
#[error("plugin reply parse error: {0}")]
ParseReply(String),
}
pub fn build_router_from_handles(
handles: &std::collections::BTreeMap<String, Arc<dyn DynPluginManifest>>,
) -> PluginHttpRouter {
let mut router = PluginHttpRouter::new();
for (plugin_id, handle) in handles.iter() {
if let Some(section) = handle.http_section() {
let _ = router.register(plugin_id, §ion.mount_prefix, section.timeout);
}
}
router
}
pub trait DynPluginManifest: Send + Sync {
fn http_section(&self) -> Option<HttpSectionView>;
}
#[derive(Debug, Clone)]
pub struct HttpSectionView {
pub mount_prefix: String,
pub timeout: Option<Duration>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn match_path_uses_longest_prefix_first() {
let mut r = PluginHttpRouter::new();
r.register("plugin_short", "/api", None).unwrap();
r.register("plugin_long", "/api/v1", None).unwrap();
let (id, _) = r.match_path("/api/v1/users").expect("matches");
assert_eq!(id, "plugin_long");
}
#[test]
fn match_path_falls_back_to_shorter_prefix() {
let mut r = PluginHttpRouter::new();
r.register("plugin_short", "/api", None).unwrap();
r.register("plugin_long", "/api/v1", None).unwrap();
let (id, _) = r.match_path("/api/v2/users").expect("matches");
assert_eq!(id, "plugin_short");
}
#[test]
fn match_path_returns_none_for_no_match() {
let mut r = PluginHttpRouter::new();
r.register("plugin", "/whatsapp", None).unwrap();
assert!(r.match_path("/foo").is_none());
assert!(r.match_path("/").is_none());
}
#[test]
fn register_replaces_existing_entry_for_same_plugin() {
let mut r = PluginHttpRouter::new();
r.register("plugin", "/old", None).unwrap();
r.register("plugin", "/new", None).unwrap();
assert!(r.match_path("/old").is_none());
assert!(r.match_path("/new").is_some());
}
#[test]
fn register_applies_custom_timeout() {
let mut r = PluginHttpRouter::new();
r.register("plugin", "/slow", Some(Duration::from_secs(120)))
.unwrap();
let (_, t) = r.match_path("/slow/foo").unwrap();
assert_eq!(t, Duration::from_secs(120));
}
#[test]
fn register_default_timeout_when_none() {
let mut r = PluginHttpRouter::new();
r.register("plugin", "/fast", None).unwrap();
let (_, t) = r.match_path("/fast/foo").unwrap();
assert_eq!(t, DEFAULT_TIMEOUT);
}
#[test]
fn register_rejects_reserved_prefixes() {
let mut r = PluginHttpRouter::new();
for reserved in RESERVED_PREFIXES {
let result = r.register("evil_plugin", reserved, None);
assert!(
matches!(result, Err(RouteRegistrationError::Reserved { .. })),
"expected reservation rejection for `{reserved}`, got {result:?}",
);
}
}
#[test]
fn register_rejects_subpath_of_reserved() {
let mut r = PluginHttpRouter::new();
let result = r.register("evil_plugin", "/health/foo", None);
assert!(matches!(
result,
Err(RouteRegistrationError::Reserved { .. })
));
}
#[test]
fn register_accepts_prefixes_that_only_share_substring_with_reserved() {
let mut r = PluginHttpRouter::new();
assert!(r.register("plugin", "/healthy", None).is_ok());
assert!(r.register("plugin2", "/metrics-aggregator", None).is_ok());
}
#[test]
fn plugin_http_response_decodes_body() {
let response = PluginHttpResponse {
status: 200,
headers: vec![("Content-Type".into(), "text/html".into())],
body_base64: base64::engine::general_purpose::STANDARD.encode("<html/>"),
};
assert_eq!(response.decoded_body(), b"<html/>");
assert_eq!(response.header("content-type"), Some("text/html"));
assert_eq!(response.header("CONTENT-TYPE"), Some("text/html"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn forward_request_returns_broker_error_when_no_subscriber() {
let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
let result = forward_request(
&broker,
"plugin",
"GET",
"/whatsapp/pair",
"",
&[],
&[],
Duration::from_millis(100),
)
.await;
assert!(matches!(result, Err(PluginHttpForwardError::Broker(_))));
}
}