1use std::time::Duration;
16
17use nexo_broker::{AnyBroker, BrokerHandle, Message};
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20
21const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
22
23pub const RESERVED_ADMIN_PREFIXES: &[&str] = &[
30 "nexo/admin/agents/",
31 "nexo/admin/credentials/",
32 "nexo/admin/pairing/",
33 "nexo/admin/llm/",
34 "nexo/admin/channels/",
35 "nexo/admin/tenants/",
36 "nexo/admin/memory/",
37 "nexo/admin/sessions/",
38 "nexo/admin/snapshots/",
39 "nexo/admin/policy/",
40];
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct PluginAdminResponse {
47 pub ok: bool,
49 #[serde(default)]
52 pub result: Value,
53 #[serde(default)]
55 pub error: String,
56}
57
58#[derive(Debug, Clone)]
59struct Route {
60 method_prefix: String,
61 broker_topic_prefix: String,
62 plugin_id: String,
63 timeout: Duration,
64}
65
66#[derive(Debug, Default)]
71pub struct PluginAdminRouter {
72 routes: std::sync::RwLock<Vec<Route>>,
73}
74
75impl PluginAdminRouter {
76 pub fn new() -> Self {
77 Self::default()
78 }
79
80 pub fn register(
84 &self,
85 plugin_id: &str,
86 method_prefix: &str,
87 broker_topic_prefix: &str,
88 timeout: Option<Duration>,
89 ) -> Result<(), AdminRouteRegistrationError> {
90 for reserved in RESERVED_ADMIN_PREFIXES {
91 if method_prefix == *reserved
92 || method_prefix.starts_with(reserved)
93 || reserved.starts_with(method_prefix)
94 {
95 return Err(AdminRouteRegistrationError::Reserved {
96 requested: method_prefix.to_string(),
97 reserved: (*reserved).to_string(),
98 });
99 }
100 }
101 let mut routes = self.routes.write().expect("router lock poisoned");
102 routes.retain(|r| r.plugin_id != plugin_id);
103 routes.push(Route {
104 method_prefix: method_prefix.to_string(),
105 broker_topic_prefix: broker_topic_prefix.to_string(),
106 plugin_id: plugin_id.to_string(),
107 timeout: timeout.unwrap_or(DEFAULT_TIMEOUT),
108 });
109 routes.sort_by(|a, b| {
110 b.method_prefix
111 .len()
112 .cmp(&a.method_prefix.len())
113 .then_with(|| a.plugin_id.cmp(&b.plugin_id))
114 });
115 Ok(())
116 }
117
118 pub fn match_method(&self, method: &str) -> Option<MatchInfo> {
124 let routes = self.routes.read().expect("router lock poisoned");
125 routes
126 .iter()
127 .find(|r| method.starts_with(&r.method_prefix))
128 .map(|r| MatchInfo {
129 plugin_id: r.plugin_id.clone(),
130 broker_topic_prefix: r.broker_topic_prefix.clone(),
131 method_prefix: r.method_prefix.clone(),
132 timeout: r.timeout,
133 })
134 }
135
136 pub fn is_empty(&self) -> bool {
137 self.routes.read().expect("router lock poisoned").is_empty()
138 }
139}
140
141#[derive(Debug, Clone)]
146pub struct MatchInfo {
147 pub plugin_id: String,
148 pub broker_topic_prefix: String,
149 pub method_prefix: String,
150 pub timeout: Duration,
151}
152
153pub fn method_to_broker_suffix(method: &str, prefix: &str) -> String {
158 method
159 .strip_prefix(prefix)
160 .unwrap_or(method)
161 .replace('/', ".")
162}
163
164pub async fn forward_request(
168 broker: &AnyBroker,
169 info: MatchInfo,
170 method: &str,
171 params: Value,
172) -> Result<PluginAdminResponse, PluginAdminForwardError> {
173 let suffix = method_to_broker_suffix(method, &info.method_prefix);
174 let topic = format!("{}.{suffix}", info.broker_topic_prefix);
175 let payload = json!({ "method": method, "params": params });
176 let msg = Message::new(topic.clone(), payload);
177 let reply = broker
178 .request(&topic, msg, info.timeout)
179 .await
180 .map_err(|e| PluginAdminForwardError::Broker(e.to_string()))?;
181 serde_json::from_value::<PluginAdminResponse>(reply.payload).map_err(|e| {
182 PluginAdminForwardError::ParseReply(format!(
183 "plugin {} returned malformed admin reply: {e}",
184 info.plugin_id
185 ))
186 })
187}
188
189#[derive(Debug, thiserror::Error)]
192pub enum AdminRouteRegistrationError {
193 #[error(
194 "method_prefix `{requested}` collides with daemon-reserved prefix `{reserved}`"
195 )]
196 Reserved {
197 requested: String,
198 reserved: String,
199 },
200}
201
202#[derive(Debug, thiserror::Error)]
204pub enum PluginAdminForwardError {
205 #[error("broker error: {0}")]
206 Broker(String),
207 #[error("plugin reply parse error: {0}")]
208 ParseReply(String),
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 #[test]
216 fn match_method_uses_longest_prefix_first() {
217 let r = PluginAdminRouter::new();
218 r.register(
219 "wa",
220 "nexo/admin/whatsapp/",
221 "plugin.whatsapp.admin",
222 None,
223 )
224 .unwrap();
225 r.register(
226 "wa_bot",
227 "nexo/admin/whatsapp/bot/",
228 "plugin.whatsapp.bot",
229 None,
230 )
231 .unwrap();
232 let m = r
233 .match_method("nexo/admin/whatsapp/bot/list")
234 .expect("matches");
235 assert_eq!(m.plugin_id, "wa_bot");
236 }
237
238 #[test]
239 fn match_method_falls_back_to_shorter_prefix() {
240 let r = PluginAdminRouter::new();
241 r.register(
242 "wa",
243 "nexo/admin/whatsapp/",
244 "plugin.whatsapp.admin",
245 None,
246 )
247 .unwrap();
248 r.register(
249 "wa_bot",
250 "nexo/admin/whatsapp/bot/",
251 "plugin.whatsapp.bot",
252 None,
253 )
254 .unwrap();
255 let m = r
256 .match_method("nexo/admin/whatsapp/session/qr")
257 .expect("matches");
258 assert_eq!(m.plugin_id, "wa");
259 }
260
261 #[test]
262 fn match_method_returns_none_on_miss() {
263 let r = PluginAdminRouter::new();
264 r.register(
265 "wa",
266 "nexo/admin/whatsapp/",
267 "plugin.whatsapp.admin",
268 None,
269 )
270 .unwrap();
271 assert!(r.match_method("nexo/admin/agents/list").is_none());
272 }
273
274 #[test]
275 fn register_rejects_reserved_prefixes() {
276 let r = PluginAdminRouter::new();
277 for reserved in RESERVED_ADMIN_PREFIXES {
278 let result =
279 r.register("evil", reserved, "plugin.evil", None);
280 assert!(
281 matches!(result, Err(AdminRouteRegistrationError::Reserved { .. })),
282 "expected rejection for `{reserved}`",
283 );
284 }
285 }
286
287 #[test]
288 fn register_rejects_subpath_of_reserved() {
289 let r = PluginAdminRouter::new();
290 let result = r.register(
291 "evil",
292 "nexo/admin/agents/sneaky/",
293 "plugin.evil",
294 None,
295 );
296 assert!(matches!(
297 result,
298 Err(AdminRouteRegistrationError::Reserved { .. })
299 ));
300 }
301
302 #[test]
303 fn register_rejects_super_prefix_of_reserved() {
304 let r = PluginAdminRouter::new();
309 let result =
310 r.register("evil", "nexo/admin/", "plugin.evil", None);
311 assert!(matches!(
312 result,
313 Err(AdminRouteRegistrationError::Reserved { .. })
314 ));
315 }
316
317 #[test]
318 fn method_to_broker_suffix_replaces_slashes_with_dots() {
319 let suffix = method_to_broker_suffix(
320 "nexo/admin/whatsapp/bot/list",
321 "nexo/admin/whatsapp/",
322 );
323 assert_eq!(suffix, "bot.list");
324 }
325
326 #[test]
327 fn method_to_broker_suffix_falls_back_when_prefix_missing() {
328 let suffix = method_to_broker_suffix(
331 "nexo/admin/whatsapp/bot/list",
332 "nexo/admin/telegram/",
333 );
334 assert_eq!(suffix, "nexo.admin.whatsapp.bot.list");
335 }
336
337 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
338 async fn forward_request_returns_broker_error_with_no_subscriber() {
339 let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
340 let router = PluginAdminRouter::new();
341 router
342 .register(
343 "wa",
344 "nexo/admin/whatsapp/",
345 "plugin.whatsapp.admin",
346 Some(Duration::from_millis(100)),
347 )
348 .unwrap();
349 let info = router
350 .match_method("nexo/admin/whatsapp/bot/list")
351 .unwrap();
352 let result = forward_request(
353 &broker,
354 info,
355 "nexo/admin/whatsapp/bot/list",
356 json!({}),
357 )
358 .await;
359 assert!(matches!(result, Err(PluginAdminForwardError::Broker(_))));
360 }
361}