Skip to main content

nexo_pairing/
plugin_admin.rs

1//! Phase 81.33.b.real Stage 4 — daemon-side plugin admin RPC
2//! router.
3//!
4//! Plugins declare `[plugin.admin] method_prefix = "nexo/admin/<id>/"`
5//! in their `nexo-plugin.toml`. The admin dispatcher checks the
6//! router for every incoming method; matches forward via broker
7//! JSON-RPC to the plugin's subprocess. Replaces the previous
8//! `.with_<plugin>_handle(Arc<dyn XxxHandle>)` typed builder
9//! pattern.
10//!
11//! See [`crate::plugin_http`] for the sibling HTTP router that
12//! shares this design pattern (longest-prefix-first matching +
13//! broker-RPC forwarding + reserved-prefix safety net).
14
15use std::time::Duration;
16
17use nexo_broker::{AnyBroker, BrokerHandle, Message};
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20
21const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
22
23/// Method prefixes reserved for daemon-internal admin handlers.
24/// Plugin registrations colliding with any of these are rejected
25/// — the daemon's agents/credentials/pairing/etc. surfaces
26/// cannot be hijacked.
27///
28/// Comparison: exact prefix-equality or sub-prefix containment.
29pub const RESERVED_ADMIN_PREFIXES: &[&str] = &[
30    "nexo/admin/agents/",
31    "nexo/admin/credentials/",
32    "nexo/admin/pairing/",
33    "nexo/admin/llm/",
34    "nexo/admin/channels/",
35    "nexo/admin/tenants/",
36    "nexo/admin/memory/",
37    "nexo/admin/sessions/",
38    "nexo/admin/snapshots/",
39    "nexo/admin/policy/",
40];
41
42/// Plugin reply shape. Mirrors the dispatcher's typed
43/// `AdminRpcResult` so the daemon can render the right status
44/// without per-plugin parsing.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct PluginAdminResponse {
47    /// `true` = success; `false` = error.
48    pub ok: bool,
49    /// Result payload when `ok = true`. Caller deserializes into
50    /// the typed response shape (per `nexo-tool-meta::admin`).
51    #[serde(default)]
52    pub result: Value,
53    /// Error message when `ok = false`.
54    #[serde(default)]
55    pub error: String,
56}
57
58#[derive(Debug, Clone)]
59struct Route {
60    method_prefix: String,
61    broker_topic_prefix: String,
62    plugin_id: String,
63    timeout: Duration,
64}
65
66/// Longest-prefix-first matcher with interior mutability so the
67/// daemon can construct an empty router at admin-bootstrap time
68/// and populate it AFTER `wire_plugin_registry` returns the
69/// plugin handles.
70#[derive(Debug, Default)]
71pub struct PluginAdminRouter {
72    routes: std::sync::RwLock<Vec<Route>>,
73}
74
75impl PluginAdminRouter {
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Register a plugin's admin prefix. Rejects registrations
81    /// colliding with [`RESERVED_ADMIN_PREFIXES`]. Duplicate
82    /// `plugin_id` overrides previous entries (hot-spawn restart).
83    pub fn register(
84        &self,
85        plugin_id: &str,
86        method_prefix: &str,
87        broker_topic_prefix: &str,
88        timeout: Option<Duration>,
89    ) -> Result<(), AdminRouteRegistrationError> {
90        for reserved in RESERVED_ADMIN_PREFIXES {
91            if method_prefix == *reserved
92                || method_prefix.starts_with(reserved)
93                || reserved.starts_with(method_prefix)
94            {
95                return Err(AdminRouteRegistrationError::Reserved {
96                    requested: method_prefix.to_string(),
97                    reserved: (*reserved).to_string(),
98                });
99            }
100        }
101        let mut routes = self.routes.write().expect("router lock poisoned");
102        routes.retain(|r| r.plugin_id != plugin_id);
103        routes.push(Route {
104            method_prefix: method_prefix.to_string(),
105            broker_topic_prefix: broker_topic_prefix.to_string(),
106            plugin_id: plugin_id.to_string(),
107            timeout: timeout.unwrap_or(DEFAULT_TIMEOUT),
108        });
109        routes.sort_by(|a, b| {
110            b.method_prefix
111                .len()
112                .cmp(&a.method_prefix.len())
113                .then_with(|| a.plugin_id.cmp(&b.plugin_id))
114        });
115        Ok(())
116    }
117
118    /// Match an admin method against registered prefixes.
119    /// Returns the dispatch metadata needed by
120    /// [`forward_request`]. The returned [`MatchInfo`] is owned
121    /// (not borrowing the router) so the caller can drop the
122    /// router lock immediately.
123    pub fn match_method(&self, method: &str) -> Option<MatchInfo> {
124        let routes = self.routes.read().expect("router lock poisoned");
125        routes
126            .iter()
127            .find(|r| method.starts_with(&r.method_prefix))
128            .map(|r| MatchInfo {
129                plugin_id: r.plugin_id.clone(),
130                broker_topic_prefix: r.broker_topic_prefix.clone(),
131                method_prefix: r.method_prefix.clone(),
132                timeout: r.timeout,
133            })
134    }
135
136    pub fn is_empty(&self) -> bool {
137        self.routes.read().expect("router lock poisoned").is_empty()
138    }
139}
140
141/// Output of [`PluginAdminRouter::match_method`]. Owned strings
142/// so the caller can drop the router lock immediately and the
143/// resulting forward call does not hold the lock across an
144/// async broker round-trip.
145#[derive(Debug, Clone)]
146pub struct MatchInfo {
147    pub plugin_id: String,
148    pub broker_topic_prefix: String,
149    pub method_prefix: String,
150    pub timeout: Duration,
151}
152
153/// Translate the trailing portion of an admin method into the
154/// broker subject suffix. Replaces `/` with `.` so
155/// `nexo/admin/whatsapp/bot/list` with prefix `nexo/admin/whatsapp/`
156/// → `bot/list` → `bot.list`.
157pub fn method_to_broker_suffix(method: &str, prefix: &str) -> String {
158    method
159        .strip_prefix(prefix)
160        .unwrap_or(method)
161        .replace('/', ".")
162}
163
164/// Forward an admin method invocation to the plugin via broker
165/// JSON-RPC. Caller passes pre-parsed JSON params + parses the
166/// returned [`PluginAdminResponse`] into the typed shape.
167pub async fn forward_request(
168    broker: &AnyBroker,
169    info: MatchInfo,
170    method: &str,
171    params: Value,
172) -> Result<PluginAdminResponse, PluginAdminForwardError> {
173    let suffix = method_to_broker_suffix(method, &info.method_prefix);
174    let topic = format!("{}.{suffix}", info.broker_topic_prefix);
175    let payload = json!({ "method": method, "params": params });
176    let msg = Message::new(topic.clone(), payload);
177    let reply = broker
178        .request(&topic, msg, info.timeout)
179        .await
180        .map_err(|e| PluginAdminForwardError::Broker(e.to_string()))?;
181    serde_json::from_value::<PluginAdminResponse>(reply.payload).map_err(|e| {
182        PluginAdminForwardError::ParseReply(format!(
183            "plugin {} returned malformed admin reply: {e}",
184            info.plugin_id
185        ))
186    })
187}
188
189/// Route registration error. Returned by
190/// [`PluginAdminRouter::register`].
191#[derive(Debug, thiserror::Error)]
192pub enum AdminRouteRegistrationError {
193    #[error(
194        "method_prefix `{requested}` collides with daemon-reserved prefix `{reserved}`"
195    )]
196    Reserved {
197        requested: String,
198        reserved: String,
199    },
200}
201
202/// Typed forwarder errors.
203#[derive(Debug, thiserror::Error)]
204pub enum PluginAdminForwardError {
205    #[error("broker error: {0}")]
206    Broker(String),
207    #[error("plugin reply parse error: {0}")]
208    ParseReply(String),
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[test]
216    fn match_method_uses_longest_prefix_first() {
217        let r = PluginAdminRouter::new();
218        r.register(
219            "wa",
220            "nexo/admin/whatsapp/",
221            "plugin.whatsapp.admin",
222            None,
223        )
224        .unwrap();
225        r.register(
226            "wa_bot",
227            "nexo/admin/whatsapp/bot/",
228            "plugin.whatsapp.bot",
229            None,
230        )
231        .unwrap();
232        let m = r
233            .match_method("nexo/admin/whatsapp/bot/list")
234            .expect("matches");
235        assert_eq!(m.plugin_id, "wa_bot");
236    }
237
238    #[test]
239    fn match_method_falls_back_to_shorter_prefix() {
240        let r = PluginAdminRouter::new();
241        r.register(
242            "wa",
243            "nexo/admin/whatsapp/",
244            "plugin.whatsapp.admin",
245            None,
246        )
247        .unwrap();
248        r.register(
249            "wa_bot",
250            "nexo/admin/whatsapp/bot/",
251            "plugin.whatsapp.bot",
252            None,
253        )
254        .unwrap();
255        let m = r
256            .match_method("nexo/admin/whatsapp/session/qr")
257            .expect("matches");
258        assert_eq!(m.plugin_id, "wa");
259    }
260
261    #[test]
262    fn match_method_returns_none_on_miss() {
263        let r = PluginAdminRouter::new();
264        r.register(
265            "wa",
266            "nexo/admin/whatsapp/",
267            "plugin.whatsapp.admin",
268            None,
269        )
270        .unwrap();
271        assert!(r.match_method("nexo/admin/agents/list").is_none());
272    }
273
274    #[test]
275    fn register_rejects_reserved_prefixes() {
276        let r = PluginAdminRouter::new();
277        for reserved in RESERVED_ADMIN_PREFIXES {
278            let result =
279                r.register("evil", reserved, "plugin.evil", None);
280            assert!(
281                matches!(result, Err(AdminRouteRegistrationError::Reserved { .. })),
282                "expected rejection for `{reserved}`",
283            );
284        }
285    }
286
287    #[test]
288    fn register_rejects_subpath_of_reserved() {
289        let r = PluginAdminRouter::new();
290        let result = r.register(
291            "evil",
292            "nexo/admin/agents/sneaky/",
293            "plugin.evil",
294            None,
295        );
296        assert!(matches!(
297            result,
298            Err(AdminRouteRegistrationError::Reserved { .. })
299        ));
300    }
301
302    #[test]
303    fn register_rejects_super_prefix_of_reserved() {
304        // A plugin asking for `nexo/admin/age/` would shadow the
305        // daemon's `nexo/admin/agents/` namespace if matched
306        // first (because `nexo/admin/age/` is a SHORTER prefix
307        // that the dispatcher would compare against).
308        let r = PluginAdminRouter::new();
309        let result =
310            r.register("evil", "nexo/admin/", "plugin.evil", None);
311        assert!(matches!(
312            result,
313            Err(AdminRouteRegistrationError::Reserved { .. })
314        ));
315    }
316
317    #[test]
318    fn method_to_broker_suffix_replaces_slashes_with_dots() {
319        let suffix = method_to_broker_suffix(
320            "nexo/admin/whatsapp/bot/list",
321            "nexo/admin/whatsapp/",
322        );
323        assert_eq!(suffix, "bot.list");
324    }
325
326    #[test]
327    fn method_to_broker_suffix_falls_back_when_prefix_missing() {
328        // Defensive — should never happen if router matched, but
329        // confirms the function doesn't panic.
330        let suffix = method_to_broker_suffix(
331            "nexo/admin/whatsapp/bot/list",
332            "nexo/admin/telegram/",
333        );
334        assert_eq!(suffix, "nexo.admin.whatsapp.bot.list");
335    }
336
337    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
338    async fn forward_request_returns_broker_error_with_no_subscriber() {
339        let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
340        let router = PluginAdminRouter::new();
341        router
342            .register(
343                "wa",
344                "nexo/admin/whatsapp/",
345                "plugin.whatsapp.admin",
346                Some(Duration::from_millis(100)),
347            )
348            .unwrap();
349        let info = router
350            .match_method("nexo/admin/whatsapp/bot/list")
351            .unwrap();
352        let result = forward_request(
353            &broker,
354            info,
355            "nexo/admin/whatsapp/bot/list",
356            json!({}),
357        )
358        .await;
359        assert!(matches!(result, Err(PluginAdminForwardError::Broker(_))));
360    }
361}