use std::time::Duration;
use nexo_broker::{AnyBroker, BrokerHandle, Message};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
pub const RESERVED_ADMIN_PREFIXES: &[&str] = &[
"nexo/admin/agents/",
"nexo/admin/credentials/",
"nexo/admin/pairing/",
"nexo/admin/llm/",
"nexo/admin/channels/",
"nexo/admin/tenants/",
"nexo/admin/memory/",
"nexo/admin/sessions/",
"nexo/admin/snapshots/",
"nexo/admin/policy/",
];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PluginAdminResponse {
pub ok: bool,
#[serde(default)]
pub result: Value,
#[serde(default)]
pub error: String,
}
#[derive(Debug, Clone)]
struct Route {
method_prefix: String,
broker_topic_prefix: String,
plugin_id: String,
timeout: Duration,
}
#[derive(Debug, Default)]
pub struct PluginAdminRouter {
routes: std::sync::RwLock<Vec<Route>>,
}
impl PluginAdminRouter {
pub fn new() -> Self {
Self::default()
}
pub fn register(
&self,
plugin_id: &str,
method_prefix: &str,
broker_topic_prefix: &str,
timeout: Option<Duration>,
) -> Result<(), AdminRouteRegistrationError> {
for reserved in RESERVED_ADMIN_PREFIXES {
if method_prefix == *reserved
|| method_prefix.starts_with(reserved)
|| reserved.starts_with(method_prefix)
{
return Err(AdminRouteRegistrationError::Reserved {
requested: method_prefix.to_string(),
reserved: (*reserved).to_string(),
});
}
}
let mut routes = self.routes.write().expect("router lock poisoned");
routes.retain(|r| r.plugin_id != plugin_id);
routes.push(Route {
method_prefix: method_prefix.to_string(),
broker_topic_prefix: broker_topic_prefix.to_string(),
plugin_id: plugin_id.to_string(),
timeout: timeout.unwrap_or(DEFAULT_TIMEOUT),
});
routes.sort_by(|a, b| {
b.method_prefix
.len()
.cmp(&a.method_prefix.len())
.then_with(|| a.plugin_id.cmp(&b.plugin_id))
});
Ok(())
}
pub fn match_method(&self, method: &str) -> Option<MatchInfo> {
let routes = self.routes.read().expect("router lock poisoned");
routes
.iter()
.find(|r| method.starts_with(&r.method_prefix))
.map(|r| MatchInfo {
plugin_id: r.plugin_id.clone(),
broker_topic_prefix: r.broker_topic_prefix.clone(),
method_prefix: r.method_prefix.clone(),
timeout: r.timeout,
})
}
pub fn is_empty(&self) -> bool {
self.routes.read().expect("router lock poisoned").is_empty()
}
}
#[derive(Debug, Clone)]
pub struct MatchInfo {
pub plugin_id: String,
pub broker_topic_prefix: String,
pub method_prefix: String,
pub timeout: Duration,
}
pub fn method_to_broker_suffix(method: &str, prefix: &str) -> String {
method
.strip_prefix(prefix)
.unwrap_or(method)
.replace('/', ".")
}
pub async fn forward_request(
broker: &AnyBroker,
info: MatchInfo,
method: &str,
params: Value,
) -> Result<PluginAdminResponse, PluginAdminForwardError> {
let suffix = method_to_broker_suffix(method, &info.method_prefix);
let topic = format!("{}.{suffix}", info.broker_topic_prefix);
let payload = json!({ "method": method, "params": params });
let msg = Message::new(topic.clone(), payload);
let reply = broker
.request(&topic, msg, info.timeout)
.await
.map_err(|e| PluginAdminForwardError::Broker(e.to_string()))?;
serde_json::from_value::<PluginAdminResponse>(reply.payload).map_err(|e| {
PluginAdminForwardError::ParseReply(format!(
"plugin {} returned malformed admin reply: {e}",
info.plugin_id
))
})
}
#[derive(Debug, thiserror::Error)]
pub enum AdminRouteRegistrationError {
#[error("method_prefix `{requested}` collides with daemon-reserved prefix `{reserved}`")]
Reserved { requested: String, reserved: String },
}
#[derive(Debug, thiserror::Error)]
pub enum PluginAdminForwardError {
#[error("broker error: {0}")]
Broker(String),
#[error("plugin reply parse error: {0}")]
ParseReply(String),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn match_method_uses_longest_prefix_first() {
let r = PluginAdminRouter::new();
r.register("wa", "nexo/admin/whatsapp/", "plugin.whatsapp.admin", None)
.unwrap();
r.register(
"wa_bot",
"nexo/admin/whatsapp/bot/",
"plugin.whatsapp.bot",
None,
)
.unwrap();
let m = r
.match_method("nexo/admin/whatsapp/bot/list")
.expect("matches");
assert_eq!(m.plugin_id, "wa_bot");
}
#[test]
fn match_method_falls_back_to_shorter_prefix() {
let r = PluginAdminRouter::new();
r.register("wa", "nexo/admin/whatsapp/", "plugin.whatsapp.admin", None)
.unwrap();
r.register(
"wa_bot",
"nexo/admin/whatsapp/bot/",
"plugin.whatsapp.bot",
None,
)
.unwrap();
let m = r
.match_method("nexo/admin/whatsapp/session/qr")
.expect("matches");
assert_eq!(m.plugin_id, "wa");
}
#[test]
fn match_method_returns_none_on_miss() {
let r = PluginAdminRouter::new();
r.register("wa", "nexo/admin/whatsapp/", "plugin.whatsapp.admin", None)
.unwrap();
assert!(r.match_method("nexo/admin/agents/list").is_none());
}
#[test]
fn register_rejects_reserved_prefixes() {
let r = PluginAdminRouter::new();
for reserved in RESERVED_ADMIN_PREFIXES {
let result = r.register("evil", reserved, "plugin.evil", None);
assert!(
matches!(result, Err(AdminRouteRegistrationError::Reserved { .. })),
"expected rejection for `{reserved}`",
);
}
}
#[test]
fn register_rejects_subpath_of_reserved() {
let r = PluginAdminRouter::new();
let result = r.register("evil", "nexo/admin/agents/sneaky/", "plugin.evil", None);
assert!(matches!(
result,
Err(AdminRouteRegistrationError::Reserved { .. })
));
}
#[test]
fn register_rejects_super_prefix_of_reserved() {
let r = PluginAdminRouter::new();
let result = r.register("evil", "nexo/admin/", "plugin.evil", None);
assert!(matches!(
result,
Err(AdminRouteRegistrationError::Reserved { .. })
));
}
#[test]
fn method_to_broker_suffix_replaces_slashes_with_dots() {
let suffix =
method_to_broker_suffix("nexo/admin/whatsapp/bot/list", "nexo/admin/whatsapp/");
assert_eq!(suffix, "bot.list");
}
#[test]
fn method_to_broker_suffix_falls_back_when_prefix_missing() {
let suffix =
method_to_broker_suffix("nexo/admin/whatsapp/bot/list", "nexo/admin/telegram/");
assert_eq!(suffix, "nexo.admin.whatsapp.bot.list");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn forward_request_returns_broker_error_with_no_subscriber() {
let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
let router = PluginAdminRouter::new();
router
.register(
"wa",
"nexo/admin/whatsapp/",
"plugin.whatsapp.admin",
Some(Duration::from_millis(100)),
)
.unwrap();
let info = router.match_method("nexo/admin/whatsapp/bot/list").unwrap();
let result =
forward_request(&broker, info, "nexo/admin/whatsapp/bot/list", json!({})).await;
assert!(matches!(result, Err(PluginAdminForwardError::Broker(_))));
}
}