Skip to main content

nexo_pairing/
plugin_http.rs

1//! Phase 81.33.b.real Stage 2 — daemon-side plugin HTTP proxy.
2//!
3//! Plugins declare `[plugin.http] mount_prefix = "/<prefix>"` in
4//! their `nexo-plugin.toml`. The daemon builds a
5//! [`PluginHttpRouter`] from every declaring plugin's manifest at
6//! boot, and the HTTP handler checks the router on every incoming
7//! request BEFORE the legacy hardcoded `/whatsapp/*` etc. paths.
8//!
9//! A match forwards the request to the plugin's subprocess via
10//! broker JSON-RPC (`plugin.<id>.http.request`), reads the reply,
11//! and writes it back to the TCP stream.
12//!
13//! ## Wire format
14//!
15//! See [`crate::plugin_http`] crate docs for the JSON shape.
16//! Briefly:
17//!
18//! - request payload:
19//!   `{ method, path, query, headers: [[k,v],…], body_base64 }`
20//! - reply payload:
21//!   `{ status, headers: [[k,v],…], body_base64 }`
22//!
23//! ## Limitations
24//!
25//! - No streaming responses (SSE, chunked transfer). Plugin must
26//!   buffer the full response before replying.
27//! - No WebSocket upgrades. WebSocket endpoints stay on
28//!   `[plugin.http_server]` (plugin binds its own port).
29//! - Body bytes are base64-encoded JSON; OK for HTML pages (≤100KB
30//!   typical) but wasteful for large uploads.
31
32use std::sync::Arc;
33use std::time::Duration;
34
35use base64::Engine;
36use nexo_broker::{AnyBroker, BrokerHandle, Message};
37use serde::{Deserialize, Serialize};
38use serde_json::json;
39
40const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
41
42/// Plugin-side reply parsed from the broker RPC.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct PluginHttpResponse {
45    /// HTTP status code (e.g. 200, 404, 503). Plugin must supply
46    /// — daemon does NOT default.
47    pub status: u16,
48    /// Optional response headers. `Content-Type` recommended;
49    /// daemon does NOT default-fill so the plugin owns the
50    /// response shape completely.
51    #[serde(default)]
52    pub headers: Vec<(String, String)>,
53    /// Base64-encoded body bytes. Empty string = no body.
54    #[serde(default)]
55    pub body_base64: String,
56}
57
58impl PluginHttpResponse {
59    /// Decode the response body. Returns empty vec on bad base64
60    /// (logged at warn level).
61    pub fn decoded_body(&self) -> Vec<u8> {
62        if self.body_base64.is_empty() {
63            return Vec::new();
64        }
65        base64::engine::general_purpose::STANDARD
66            .decode(self.body_base64.as_bytes())
67            .unwrap_or_default()
68    }
69
70    /// Pick a header value (case-insensitive). Returns first
71    /// match.
72    pub fn header(&self, name: &str) -> Option<&str> {
73        self.headers
74            .iter()
75            .find(|(k, _)| k.eq_ignore_ascii_case(name))
76            .map(|(_, v)| v.as_str())
77    }
78}
79
80/// Daemon-side route entry: which plugin owns which prefix +
81/// per-plugin timeout.
82#[derive(Debug, Clone)]
83struct Route {
84    mount_prefix: String,
85    plugin_id: String,
86    timeout: Duration,
87}
88
89/// Daemon-reserved path prefixes. Plugin registrations whose
90/// `mount_prefix` collides with any of these are rejected so a
91/// malicious or buggy plugin cannot hijack health/metrics/admin
92/// surfaces.
93///
94/// Order: longest-first to match the router's matching order.
95/// Comparison is exact-equality OR prefix-of-plugin: a plugin
96/// asking for `/health/foo` would still shadow the daemon's
97/// `/health` endpoint, so we reject prefixes that contain
98/// a reserved path as their leading segment too.
99pub const RESERVED_PREFIXES: &[&str] = &["/health", "/metrics", "/pair", "/admin", "/.well-known"];
100
101/// Longest-prefix-first matcher. Insertion preserves declaration
102/// order until [`Self::sort`] is called; once sorted, prefix
103/// matching is deterministic for ambiguous prefixes (e.g.
104/// `/api/v1` wins over `/api`).
105#[derive(Debug, Clone, Default)]
106pub struct PluginHttpRouter {
107    routes: Vec<Route>,
108}
109
110impl PluginHttpRouter {
111    pub fn new() -> Self {
112        Self::default()
113    }
114
115    /// Register a plugin's mount prefix. Duplicates are accepted
116    /// (last-write-wins per `plugin_id`), so a plugin that
117    /// re-registers (hot-spawn restart) replaces its own entry.
118    ///
119    /// Rejects registrations colliding with [`RESERVED_PREFIXES`]
120    /// — daemon-internal paths (`/health`, `/metrics`, `/pair`,
121    /// `/admin`, `/.well-known`) cannot be hijacked. Caller
122    /// surfaces a warn-level log for rejected registrations so
123    /// operators see the cause; the plugin's broker handler
124    /// stays unhooked from those routes.
125    pub fn register(
126        &mut self,
127        plugin_id: &str,
128        mount_prefix: &str,
129        timeout: Option<Duration>,
130    ) -> Result<(), RouteRegistrationError> {
131        for reserved in RESERVED_PREFIXES {
132            if mount_prefix == *reserved || mount_prefix.starts_with(&format!("{reserved}/")) {
133                return Err(RouteRegistrationError::Reserved {
134                    requested: mount_prefix.to_string(),
135                    reserved: (*reserved).to_string(),
136                });
137            }
138        }
139        // Drop any existing entry for the same plugin_id (live
140        // restart replaces).
141        self.routes.retain(|r| r.plugin_id != plugin_id);
142        self.routes.push(Route {
143            mount_prefix: mount_prefix.to_string(),
144            plugin_id: plugin_id.to_string(),
145            timeout: timeout.unwrap_or(DEFAULT_TIMEOUT),
146        });
147        self.sort();
148        Ok(())
149    }
150
151    /// Longest-prefix-first ordering. Required for deterministic
152    /// matching when prefixes nest (`/api/v1` MUST win over
153    /// `/api` when both are registered).
154    fn sort(&mut self) {
155        self.routes.sort_by(|a, b| {
156            b.mount_prefix
157                .len()
158                .cmp(&a.mount_prefix.len())
159                .then_with(|| a.plugin_id.cmp(&b.plugin_id))
160        });
161    }
162
163    /// Match a request path. Returns `(plugin_id, timeout)` if a
164    /// route covers it; the caller forwards via
165    /// [`forward_request`].
166    pub fn match_path(&self, path: &str) -> Option<(&str, Duration)> {
167        self.routes
168            .iter()
169            .find(|r| path.starts_with(&r.mount_prefix))
170            .map(|r| (r.plugin_id.as_str(), r.timeout))
171    }
172
173    pub fn is_empty(&self) -> bool {
174        self.routes.is_empty()
175    }
176}
177
178/// Forward a daemon-received HTTP request to the plugin via
179/// broker JSON-RPC.
180///
181/// The caller has already parsed `method + full_path` from the
182/// stream and snapshot any required headers / body bytes. This
183/// helper does NOT touch the TCP stream — the caller writes the
184/// response back.
185///
186/// On broker failure (timeout, transport error, malformed reply),
187/// returns a typed error so the caller can render a 502/504.
188pub async fn forward_request(
189    broker: &AnyBroker,
190    plugin_id: &str,
191    method: &str,
192    path: &str,
193    query: &str,
194    headers: &[(String, String)],
195    body: &[u8],
196    timeout: Duration,
197) -> Result<PluginHttpResponse, PluginHttpForwardError> {
198    let topic = format!("plugin.{plugin_id}.http.request");
199    let body_b64 = base64::engine::general_purpose::STANDARD.encode(body);
200    let payload = json!({
201        "method": method,
202        "path": path,
203        "query": query,
204        "headers": headers,
205        "body_base64": body_b64,
206    });
207    let msg = Message::new(topic.clone(), payload);
208    let reply = broker
209        .request(&topic, msg, timeout)
210        .await
211        .map_err(|e| PluginHttpForwardError::Broker(e.to_string()))?;
212    serde_json::from_value::<PluginHttpResponse>(reply.payload).map_err(|e| {
213        PluginHttpForwardError::ParseReply(format!(
214            "plugin {plugin_id} returned malformed http reply: {e}"
215        ))
216    })
217}
218
219/// Route registration error. Returned by
220/// [`PluginHttpRouter::register`] when the requested
221/// `mount_prefix` violates a reserved-prefix or validation rule.
222#[derive(Debug, thiserror::Error)]
223pub enum RouteRegistrationError {
224    #[error("mount_prefix `{requested}` collides with daemon-reserved prefix `{reserved}`")]
225    Reserved { requested: String, reserved: String },
226}
227
228/// Typed forwarder errors. Caller renders the right status
229/// code:
230/// - `Broker` → 504 Gateway Timeout (or 502 for non-timeout).
231/// - `ParseReply` → 502 Bad Gateway (plugin contract violation).
232#[derive(Debug, thiserror::Error)]
233pub enum PluginHttpForwardError {
234    #[error("broker error: {0}")]
235    Broker(String),
236    #[error("plugin reply parse error: {0}")]
237    ParseReply(String),
238}
239
240/// Build a router from a slice of `(plugin_id, manifest)`. Skips
241/// plugins whose manifest does not declare `[plugin.http]`.
242pub fn build_router_from_handles(
243    handles: &std::collections::BTreeMap<String, Arc<dyn DynPluginManifest>>,
244) -> PluginHttpRouter {
245    let mut router = PluginHttpRouter::new();
246    for (plugin_id, handle) in handles.iter() {
247        if let Some(section) = handle.http_section() {
248            let _ = router.register(plugin_id, &section.mount_prefix, section.timeout);
249        }
250    }
251    router
252}
253
254/// Type-erased view of a plugin's manifest sufficient for router
255/// construction. The real `NexoPlugin` trait lives in nexo-core;
256/// pulling it into this crate would create a circular dep. The
257/// caller (daemon) walks `wire.plugin_handles` and wraps each
258/// handle in a small impl of this trait OR — simpler — passes a
259/// `Vec<(plugin_id, mount_prefix, timeout)>` directly to
260/// [`PluginHttpRouter::register`] in a loop.
261///
262/// We provide the trait as documentation for the contract; the
263/// daemon uses the direct `register()` loop in practice.
264pub trait DynPluginManifest: Send + Sync {
265    fn http_section(&self) -> Option<HttpSectionView>;
266}
267
268/// View struct mirroring
269/// [`nexo_plugin_manifest::http::PluginHttpSection`] but without
270/// the cross-crate dep (router crate already depends on
271/// nexo-plugin-manifest so this is mostly cosmetic — kept here so
272/// downstream alternative impls can avoid the dep if needed).
273#[derive(Debug, Clone)]
274pub struct HttpSectionView {
275    pub mount_prefix: String,
276    pub timeout: Option<Duration>,
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    #[test]
284    fn match_path_uses_longest_prefix_first() {
285        let mut r = PluginHttpRouter::new();
286        r.register("plugin_short", "/api", None).unwrap();
287        r.register("plugin_long", "/api/v1", None).unwrap();
288        let (id, _) = r.match_path("/api/v1/users").expect("matches");
289        assert_eq!(id, "plugin_long");
290    }
291
292    #[test]
293    fn match_path_falls_back_to_shorter_prefix() {
294        let mut r = PluginHttpRouter::new();
295        r.register("plugin_short", "/api", None).unwrap();
296        r.register("plugin_long", "/api/v1", None).unwrap();
297        let (id, _) = r.match_path("/api/v2/users").expect("matches");
298        assert_eq!(id, "plugin_short");
299    }
300
301    #[test]
302    fn match_path_returns_none_for_no_match() {
303        let mut r = PluginHttpRouter::new();
304        r.register("plugin", "/whatsapp", None).unwrap();
305        assert!(r.match_path("/foo").is_none());
306        assert!(r.match_path("/").is_none());
307    }
308
309    #[test]
310    fn register_replaces_existing_entry_for_same_plugin() {
311        let mut r = PluginHttpRouter::new();
312        r.register("plugin", "/old", None).unwrap();
313        r.register("plugin", "/new", None).unwrap();
314        assert!(r.match_path("/old").is_none());
315        assert!(r.match_path("/new").is_some());
316    }
317
318    #[test]
319    fn register_applies_custom_timeout() {
320        let mut r = PluginHttpRouter::new();
321        r.register("plugin", "/slow", Some(Duration::from_secs(120)))
322            .unwrap();
323        let (_, t) = r.match_path("/slow/foo").unwrap();
324        assert_eq!(t, Duration::from_secs(120));
325    }
326
327    #[test]
328    fn register_default_timeout_when_none() {
329        let mut r = PluginHttpRouter::new();
330        r.register("plugin", "/fast", None).unwrap();
331        let (_, t) = r.match_path("/fast/foo").unwrap();
332        assert_eq!(t, DEFAULT_TIMEOUT);
333    }
334
335    #[test]
336    fn register_rejects_reserved_prefixes() {
337        let mut r = PluginHttpRouter::new();
338        for reserved in RESERVED_PREFIXES {
339            let result = r.register("evil_plugin", reserved, None);
340            assert!(
341                matches!(result, Err(RouteRegistrationError::Reserved { .. })),
342                "expected reservation rejection for `{reserved}`, got {result:?}",
343            );
344        }
345    }
346
347    #[test]
348    fn register_rejects_subpath_of_reserved() {
349        let mut r = PluginHttpRouter::new();
350        // `/health/foo` shadows the daemon's `/health` endpoint
351        // if matched first — must be rejected.
352        let result = r.register("evil_plugin", "/health/foo", None);
353        assert!(matches!(
354            result,
355            Err(RouteRegistrationError::Reserved { .. })
356        ));
357    }
358
359    #[test]
360    fn register_accepts_prefixes_that_only_share_substring_with_reserved() {
361        let mut r = PluginHttpRouter::new();
362        // `/healthy` is NOT a subpath of `/health` — `/health/` is
363        // the prefix that would shadow; `/healthy` is a sibling.
364        assert!(r.register("plugin", "/healthy", None).is_ok());
365        assert!(r.register("plugin2", "/metrics-aggregator", None).is_ok());
366    }
367
368    #[test]
369    fn plugin_http_response_decodes_body() {
370        let response = PluginHttpResponse {
371            status: 200,
372            headers: vec![("Content-Type".into(), "text/html".into())],
373            body_base64: base64::engine::general_purpose::STANDARD.encode("<html/>"),
374        };
375        assert_eq!(response.decoded_body(), b"<html/>");
376        assert_eq!(response.header("content-type"), Some("text/html"));
377        assert_eq!(response.header("CONTENT-TYPE"), Some("text/html"));
378    }
379
380    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
381    async fn forward_request_returns_broker_error_when_no_subscriber() {
382        let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
383        let result = forward_request(
384            &broker,
385            "plugin",
386            "GET",
387            "/whatsapp/pair",
388            "",
389            &[],
390            &[],
391            Duration::from_millis(100),
392        )
393        .await;
394        assert!(matches!(result, Err(PluginHttpForwardError::Broker(_))));
395    }
396}