Skip to main content

nexo_pairing/
plugin_poller.rs

1//! Phase 96 — daemon-side plugin poller router.
2//!
3//! Plugins declare `[plugin.poller] kinds = [...]` in their
4//! `nexo-plugin.toml`. The daemon's poller runner consults this
5//! router for every job in `pollers.yaml`: if the job's `kind`
6//! belongs to a registered plugin, the router builds the broker
7//! request topic `<broker_topic_prefix>.tick` and forwards the tick
8//! payload via JSON-RPC; the subprocess replies with the same
9//! `TickAck` shape the in-process trait returns.
10//!
11//! Mirrors [`crate::plugin_admin`] / [`crate::plugin_http`]:
12//! interior-mutability via `RwLock` so the daemon can construct
13//! an empty router at boot and populate it AFTER plugin manifests
14//! load + supervisor wires.
15
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use chrono::{DateTime, Utc};
21use nexo_broker::{AnyBroker, BrokerHandle, Message};
22use nexo_plugin_manifest::poller::PollerLifecycle;
23use nexo_poller::{PollContext, Poller, PollerError, TickAck, TickMetrics};
24use serde::{Deserialize, Serialize};
25use serde_json::{json, Value};
26
27/// One registered plugin's poller capability.
28#[derive(Debug, Clone)]
29pub struct PluginPollerHandle {
30    pub plugin_id: String,
31    pub kinds: Vec<String>,
32    pub broker_topic_prefix: String,
33    pub lifecycle: PollerLifecycle,
34    pub max_concurrent_ticks: u32,
35    pub tick_timeout: Duration,
36    /// `[plugin.entrypoint].command` — binary path the daemon
37    /// spawns. Required for `ephemeral` lifecycle (each tick spawns
38    /// a fresh subprocess). Optional for `long_lived` (the broker
39    /// path doesn't need it).
40    pub entrypoint_command: Option<String>,
41}
42
43impl PluginPollerHandle {
44    pub fn tick_topic(&self) -> String {
45        format!("{}.tick", self.broker_topic_prefix)
46    }
47}
48
49/// Daemon-side router. Looks up which plugin owns a `kind` and
50/// forwards tick requests via broker JSON-RPC.
51#[derive(Debug, Default)]
52pub struct PluginPollerRouter {
53    /// All registered handles. Lookup is by kind, but we store
54    /// handles (not kind→handle map) so a single plugin owning N
55    /// kinds is one row.
56    handles: std::sync::RwLock<Vec<Arc<PluginPollerHandle>>>,
57}
58
59impl PluginPollerRouter {
60    pub fn new() -> Self {
61        Self::default()
62    }
63
64    /// Register a plugin's poller capability. Returns an error if
65    /// any of the requested `kinds` is already owned by a different
66    /// plugin (cross-manifest uniqueness — boot-time fail-fast).
67    /// Re-registration of the same `plugin_id` is allowed and
68    /// replaces the previous entry (hot-spawn restart).
69    pub fn register(&self, handle: PluginPollerHandle) -> Result<(), PollerRouteRegistrationError> {
70        let mut all = self.handles.write().expect("router lock poisoned");
71        for existing in all.iter() {
72            if existing.plugin_id == handle.plugin_id {
73                // Hot-spawn restart — replaced below.
74                continue;
75            }
76            for k in &handle.kinds {
77                if existing.kinds.iter().any(|ek| ek == k) {
78                    return Err(PollerRouteRegistrationError::DuplicateKind {
79                        kind: k.clone(),
80                        existing_plugin: existing.plugin_id.clone(),
81                        new_plugin: handle.plugin_id.clone(),
82                    });
83                }
84            }
85        }
86        all.retain(|h| h.plugin_id != handle.plugin_id);
87        all.push(Arc::new(handle));
88        Ok(())
89    }
90
91    /// Drop the handle for a given plugin id (hot-remove).
92    pub fn unregister(&self, plugin_id: &str) -> bool {
93        let mut all = self.handles.write().expect("router lock poisoned");
94        let before = all.len();
95        all.retain(|h| h.plugin_id != plugin_id);
96        all.len() != before
97    }
98
99    /// Look up the handle that owns `kind`. Returns an `Arc` clone
100    /// so the caller can drop the read lock immediately.
101    pub fn handle_for_kind(&self, kind: &str) -> Option<Arc<PluginPollerHandle>> {
102        let all = self.handles.read().expect("router lock poisoned");
103        all.iter()
104            .find(|h| h.kinds.iter().any(|k| k == kind))
105            .cloned()
106    }
107
108    /// Look up the handle owned by `plugin_id`. Used by the daemon
109    /// boot loop to wire one `PluginPollerProxy` per declared kind.
110    pub fn handles_for_plugin(&self, plugin_id: &str) -> Option<Arc<PluginPollerHandle>> {
111        let all = self.handles.read().expect("router lock poisoned");
112        all.iter().find(|h| h.plugin_id == plugin_id).cloned()
113    }
114
115    /// True when no plugins are registered. Used by the runner's
116    /// fast-path skip.
117    pub fn is_empty(&self) -> bool {
118        self.handles
119            .read()
120            .expect("router lock poisoned")
121            .is_empty()
122    }
123
124    /// Number of registered plugins. For metrics.
125    pub fn len(&self) -> usize {
126        self.handles.read().expect("router lock poisoned").len()
127    }
128}
129
130/// JSON-RPC tick request payload. The plugin subprocess receives
131/// this verbatim and replies with [`TickReply`].
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct TickRequest {
134    pub kind: String,
135    pub job_id: String,
136    pub agent_id: String,
137    /// Cursor as URL-safe base64 (no padding). Subprocess decodes
138    /// before passing to its handler.
139    pub cursor: Option<String>,
140    pub config: Value,
141    /// RFC3339 timestamp.
142    pub now: String,
143    pub interval_hint_secs: u64,
144}
145
146/// Plugin reply to a tick request. Wire-compatible with
147/// [`TickAck`] but carries cursor as base64 so the message survives
148/// any text-only broker transport.
149#[derive(Debug, Clone, Serialize, Deserialize, Default)]
150pub struct TickReply {
151    #[serde(default)]
152    pub next_cursor: Option<String>,
153    #[serde(default)]
154    pub next_interval_secs: Option<u64>,
155    #[serde(default)]
156    pub metrics: Option<TickMetrics>,
157}
158
159impl TickReply {
160    /// Decode the reply into the runner-facing [`TickAck`] shape,
161    /// translating the base64 cursor back into raw bytes.
162    pub fn into_tick_ack(self) -> Result<TickAck, PluginPollerForwardError> {
163        let next_cursor = match self.next_cursor {
164            Some(s) => Some(decode_cursor(&s)?),
165            None => None,
166        };
167        Ok(TickAck {
168            next_cursor,
169            next_interval_hint: self.next_interval_secs.map(Duration::from_secs),
170            metrics: self.metrics,
171        })
172    }
173}
174
175/// Build the JSON-RPC tick request from runner inputs. Pure fn so
176/// the caller (or a future test harness) can exercise the wire
177/// shape without a broker.
178pub fn build_tick_request(
179    kind: &str,
180    job_id: &str,
181    agent_id: &str,
182    cursor: Option<&[u8]>,
183    config: Value,
184    now: DateTime<Utc>,
185    interval_hint: Duration,
186) -> TickRequest {
187    TickRequest {
188        kind: kind.to_string(),
189        job_id: job_id.to_string(),
190        agent_id: agent_id.to_string(),
191        cursor: cursor.map(encode_cursor),
192        config,
193        now: now.to_rfc3339(),
194        interval_hint_secs: interval_hint.as_secs(),
195    }
196}
197
198/// Forward a tick request to the plugin via broker JSON-RPC. Caller
199/// owns the [`TickRequest`] construction so this fn can be a thin
200/// IO wrapper. Returns the parsed [`TickReply`]; conversion to
201/// [`TickAck`] is the runner's concern.
202pub async fn forward_tick(
203    broker: &AnyBroker,
204    handle: &PluginPollerHandle,
205    request: TickRequest,
206) -> Result<TickReply, PluginPollerForwardError> {
207    let topic = handle.tick_topic();
208    let payload = json!({
209        "method": "poll_tick",
210        "params": request,
211    });
212    let msg = Message::new(topic.clone(), payload);
213    let reply = broker
214        .request(&topic, msg, handle.tick_timeout)
215        .await
216        .map_err(|e| PluginPollerForwardError::Broker(e.to_string()))?;
217    serde_json::from_value::<TickReply>(reply.payload).map_err(|e| {
218        PluginPollerForwardError::ParseReply(format!(
219            "plugin {} returned malformed poll_tick reply: {e}",
220            handle.plugin_id
221        ))
222    })
223}
224
225/// Phase 96.E — daemon-side `Poller` impl that spawns a fresh
226/// subprocess per tick, drives the tick over stdio JSON-RPC, kills
227/// the child on reply. Used when the plugin manifest declares
228/// `lifecycle = "ephemeral"`.
229///
230/// Wire shape (one JSON line each):
231///
232/// ```text
233/// daemon → subprocess.stdin:  {"method":"poll_tick","params":{...TickRequest}}
234/// subprocess → daemon.stdout: {"result":{...TickReply}} | {"error":{code,message}}
235/// ```
236///
237/// **Limitations of V1 ephemeral**:
238/// - No reverse-RPC during tick. The subprocess receives one input
239///   (TickRequest) + writes one output (TickReply). For credential
240///   resolution or LLM invocation, use `lifecycle = "long_lived"`.
241/// - The subprocess inherits the daemon's `NEXO_BROKER_URL` env so
242///   it can publish outbound directly if needed (Phase 92 pattern).
243pub struct EphemeralPollerProxy {
244    kind: &'static str,
245    handle: Arc<PluginPollerHandle>,
246}
247
248impl EphemeralPollerProxy {
249    /// Wrap a `(handle, kind)` pair for spawn-per-tick dispatch.
250    /// `handle.entrypoint_command` MUST be `Some` — the proxy uses
251    /// it as the binary path.
252    pub fn new(kind: &'static str, handle: Arc<PluginPollerHandle>) -> Self {
253        Self { kind, handle }
254    }
255}
256
257#[async_trait]
258impl Poller for EphemeralPollerProxy {
259    fn kind(&self) -> &'static str {
260        self.kind
261    }
262
263    fn description(&self) -> &'static str {
264        "(plugin v2 subprocess — ephemeral, spawn-per-tick)"
265    }
266
267    async fn tick(&self, ctx: &PollContext) -> Result<TickAck, PollerError> {
268        let command =
269            self.handle
270                .entrypoint_command
271                .as_deref()
272                .ok_or_else(|| PollerError::Config {
273                    job: ctx.job_id.clone(),
274                    reason: format!(
275                        "ephemeral poller '{}' has no [plugin.entrypoint] command",
276                        self.handle.plugin_id
277                    ),
278                })?;
279        let request = build_tick_request(
280            self.kind,
281            &ctx.job_id,
282            &ctx.agent_id,
283            ctx.cursor.as_deref(),
284            ctx.config.clone(),
285            ctx.now,
286            ctx.interval_hint,
287        );
288        spawn_ephemeral_tick(
289            command,
290            &self.handle.plugin_id,
291            request,
292            self.handle.tick_timeout,
293            ctx.cancel.clone(),
294        )
295        .await
296    }
297}
298
299/// Spawn the binary, write the tick request to stdin, read one
300/// JSON line from stdout, await exit, classify the reply.
301pub async fn spawn_ephemeral_tick(
302    command: &str,
303    plugin_id: &str,
304    request: TickRequest,
305    timeout: Duration,
306    cancel: tokio_util::sync::CancellationToken,
307) -> Result<TickAck, PollerError> {
308    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
309    use tokio::process::Command;
310
311    let request_json = serde_json::to_string(&json!({
312        "method": "poll_tick",
313        "params": request,
314    }))
315    .map_err(|e| PollerError::Config {
316        job: request.job_id.clone(),
317        reason: format!("serialize TickRequest: {e}"),
318    })?;
319
320    let mut child = Command::new(command)
321        .stdin(std::process::Stdio::piped())
322        .stdout(std::process::Stdio::piped())
323        .stderr(std::process::Stdio::piped())
324        .env("NEXO_POLLER_EPHEMERAL", "1")
325        .env("NEXO_POLLER_PLUGIN_ID", plugin_id)
326        .spawn()
327        .map_err(|e| PollerError::Transient(anyhow::anyhow!("spawn '{command}' failed: {e}")))?;
328
329    {
330        let mut stdin = child.stdin.take().ok_or_else(|| {
331            PollerError::Transient(anyhow::anyhow!("subprocess stdin not captured"))
332        })?;
333        stdin
334            .write_all(request_json.as_bytes())
335            .await
336            .map_err(|e| PollerError::Transient(anyhow::Error::from(e)))?;
337        stdin
338            .write_all(b"\n")
339            .await
340            .map_err(|e| PollerError::Transient(anyhow::Error::from(e)))?;
341        // Closing stdin signals end-of-input. Subprocess proceeds.
342    }
343
344    let mut stdout = BufReader::new(
345        child
346            .stdout
347            .take()
348            .ok_or_else(|| PollerError::Transient(anyhow::anyhow!("stdout not captured")))?,
349    );
350    let mut line = String::new();
351    let read = tokio::select! {
352        r = stdout.read_line(&mut line) => r,
353        _ = tokio::time::sleep(timeout) => {
354            let _ = child.kill().await;
355            return Err(PollerError::Transient(anyhow::anyhow!(
356                "ephemeral subprocess exceeded tick_timeout ({timeout:?})"
357            )));
358        }
359        _ = cancel.cancelled() => {
360            let _ = child.kill().await;
361            return Err(PollerError::Transient(anyhow::anyhow!(
362                "ephemeral subprocess cancelled (shutdown or hot-reload)"
363            )));
364        }
365    };
366    read.map_err(|e| PollerError::Transient(anyhow::Error::from(e)))?;
367    let _ = child.wait().await;
368
369    let trimmed = line.trim();
370    if trimmed.is_empty() {
371        return Err(PollerError::Transient(anyhow::anyhow!(
372            "ephemeral subprocess wrote no reply"
373        )));
374    }
375    let envelope: Value = serde_json::from_str(trimmed).map_err(|e| {
376        PollerError::Transient(anyhow::anyhow!(
377            "ephemeral reply parse failed: {e} (line: {trimmed:.200})"
378        ))
379    })?;
380    if let Some(err) = envelope.get("error") {
381        let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(-32603);
382        let message = err
383            .get("message")
384            .and_then(|v| v.as_str())
385            .unwrap_or("subprocess error")
386            .to_string();
387        return Err(match code {
388            -32002 => PollerError::Permanent(anyhow::anyhow!("ephemeral: {message}")),
389            -32602 => PollerError::Config {
390                job: request.job_id.clone(),
391                reason: message,
392            },
393            _ => PollerError::Transient(anyhow::anyhow!("ephemeral rpc {code}: {message}")),
394        });
395    }
396    let result = envelope.get("result").cloned().unwrap_or(Value::Null);
397    let reply: TickReply = serde_json::from_value(result)
398        .map_err(|e| PollerError::Transient(anyhow::anyhow!("ephemeral TickReply parse: {e}")))?;
399    reply
400        .into_tick_ack()
401        .map_err(|e| PollerError::Transient(anyhow::anyhow!("cursor decode: {e}")))
402}
403
404/// Daemon-side `Poller` impl that proxies every tick to a subprocess
405/// plugin via broker JSON-RPC. Registered with the runner alongside
406/// in-tree builtins so the runner sees a single homogeneous registry.
407///
408/// `kind()` returns the leaked `&'static str` for one of the
409/// plugin's declared kinds; multi-kind plugins register one proxy
410/// per kind sharing the same `PluginPollerHandle`.
411pub struct PluginPollerProxy {
412    kind: &'static str,
413    handle: Arc<PluginPollerHandle>,
414    broker: AnyBroker,
415}
416
417impl PluginPollerProxy {
418    /// Wrap a `(handle, kind)` pair into a `Poller` impl. `kind` must
419    /// be one of `handle.kinds`; the runner validates this at boot.
420    pub fn new(kind: &'static str, handle: Arc<PluginPollerHandle>, broker: AnyBroker) -> Self {
421        Self {
422            kind,
423            handle,
424            broker,
425        }
426    }
427}
428
429#[async_trait]
430impl Poller for PluginPollerProxy {
431    fn kind(&self) -> &'static str {
432        self.kind
433    }
434
435    fn description(&self) -> &'static str {
436        "(plugin v2 subprocess via [plugin.poller])"
437    }
438
439    async fn tick(&self, ctx: &PollContext) -> Result<TickAck, PollerError> {
440        let request = build_tick_request(
441            self.kind,
442            &ctx.job_id,
443            &ctx.agent_id,
444            ctx.cursor.as_deref(),
445            ctx.config.clone(),
446            ctx.now,
447            ctx.interval_hint,
448        );
449
450        let reply = forward_tick(&self.broker, &self.handle, request)
451            .await
452            .map_err(|e| match e {
453                PluginPollerForwardError::Broker(s) => {
454                    PollerError::Transient(anyhow::anyhow!("plugin poller broker: {s}"))
455                }
456                PluginPollerForwardError::ParseReply(s) => {
457                    PollerError::Transient(anyhow::anyhow!("plugin poller reply parse: {s}"))
458                }
459            })?;
460
461        reply.into_tick_ack().map_err(|e| {
462            PollerError::Transient(anyhow::anyhow!("plugin poller cursor decode: {e}"))
463        })
464    }
465}
466
467pub fn encode_cursor(raw: &[u8]) -> String {
468    use base64::Engine;
469    base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(raw)
470}
471
472fn decode_cursor(s: &str) -> Result<Vec<u8>, PluginPollerForwardError> {
473    use base64::Engine;
474    base64::engine::general_purpose::URL_SAFE_NO_PAD
475        .decode(s.trim_end_matches('='))
476        .map_err(|e| PluginPollerForwardError::ParseReply(format!("cursor base64: {e}")))
477}
478
479#[derive(Debug, thiserror::Error)]
480pub enum PollerRouteRegistrationError {
481    #[error(
482        "kind `{kind}` already owned by plugin `{existing_plugin}` — `{new_plugin}` cannot register"
483    )]
484    DuplicateKind {
485        kind: String,
486        existing_plugin: String,
487        new_plugin: String,
488    },
489}
490
491#[derive(Debug, thiserror::Error)]
492pub enum PluginPollerForwardError {
493    #[error("broker error: {0}")]
494    Broker(String),
495    #[error("plugin reply parse error: {0}")]
496    ParseReply(String),
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    fn handle(plugin_id: &str, kinds: &[&str], topic: &str) -> PluginPollerHandle {
504        PluginPollerHandle {
505            plugin_id: plugin_id.into(),
506            kinds: kinds.iter().map(|k| (*k).into()).collect(),
507            broker_topic_prefix: topic.into(),
508            lifecycle: PollerLifecycle::LongLived,
509            max_concurrent_ticks: 1,
510            tick_timeout: Duration::from_secs(60),
511            entrypoint_command: None,
512        }
513    }
514
515    #[test]
516    fn register_and_lookup_single_kind() {
517        let r = PluginPollerRouter::new();
518        r.register(handle("gcal", &["google_calendar"], "plugin.poller.gcal"))
519            .unwrap();
520        let h = r.handle_for_kind("google_calendar").expect("found");
521        assert_eq!(h.plugin_id, "gcal");
522        assert!(r.handle_for_kind("unknown").is_none());
523    }
524
525    #[test]
526    fn register_one_plugin_with_multiple_kinds() {
527        let r = PluginPollerRouter::new();
528        r.register(handle(
529            "google",
530            &["gmail", "google_calendar"],
531            "plugin.google",
532        ))
533        .unwrap();
534        assert!(r.handle_for_kind("gmail").is_some());
535        assert!(r.handle_for_kind("google_calendar").is_some());
536        assert_eq!(r.len(), 1);
537    }
538
539    #[test]
540    fn register_rejects_duplicate_kind_across_plugins() {
541        let r = PluginPollerRouter::new();
542        r.register(handle("gcal_a", &["google_calendar"], "plugin.a"))
543            .unwrap();
544        let err = r
545            .register(handle("gcal_b", &["google_calendar"], "plugin.b"))
546            .expect_err("dup kind rejected");
547        match err {
548            PollerRouteRegistrationError::DuplicateKind { kind, .. } => {
549                assert_eq!(kind, "google_calendar");
550            }
551        }
552    }
553
554    #[test]
555    fn register_same_plugin_id_replaces_previous() {
556        let r = PluginPollerRouter::new();
557        r.register(handle("gcal", &["google_calendar"], "plugin.poller.v1"))
558            .unwrap();
559        r.register(handle("gcal", &["google_calendar"], "plugin.poller.v2"))
560            .expect("replace allowed");
561        assert_eq!(r.len(), 1);
562        let h = r.handle_for_kind("google_calendar").unwrap();
563        assert_eq!(h.broker_topic_prefix, "plugin.poller.v2");
564    }
565
566    #[test]
567    fn unregister_drops_handle() {
568        let r = PluginPollerRouter::new();
569        r.register(handle("rss", &["rss"], "plugin.poller.rss"))
570            .unwrap();
571        assert!(r.unregister("rss"));
572        assert!(r.is_empty());
573        assert!(!r.unregister("rss"));
574    }
575
576    #[test]
577    fn build_tick_request_serializes_cursor_b64() {
578        let req = build_tick_request(
579            "rss",
580            "job-1",
581            "ana",
582            Some(b"hello"),
583            json!({"k": "v"}),
584            DateTime::parse_from_rfc3339("2026-05-17T10:00:00Z")
585                .unwrap()
586                .with_timezone(&Utc),
587            Duration::from_secs(300),
588        );
589        assert_eq!(req.kind, "rss");
590        assert_eq!(req.cursor.as_deref(), Some("aGVsbG8"));
591        assert_eq!(req.now, "2026-05-17T10:00:00+00:00");
592        assert_eq!(req.interval_hint_secs, 300);
593    }
594
595    #[test]
596    fn build_tick_request_omits_cursor_when_none() {
597        let req = build_tick_request(
598            "rss",
599            "job-1",
600            "ana",
601            None,
602            Value::Null,
603            Utc::now(),
604            Duration::from_secs(60),
605        );
606        assert!(req.cursor.is_none());
607    }
608
609    #[test]
610    fn tick_reply_decodes_cursor_round_trip() {
611        let reply = TickReply {
612            next_cursor: Some(encode_cursor(b"world")),
613            next_interval_secs: Some(120),
614            metrics: Some(TickMetrics {
615                items_seen: 5,
616                items_dispatched: 2,
617            }),
618        };
619        let ack = reply.into_tick_ack().unwrap();
620        assert_eq!(ack.next_cursor.as_deref(), Some(b"world".as_slice()));
621        assert_eq!(ack.next_interval_hint, Some(Duration::from_secs(120)));
622        let m = ack.metrics.unwrap();
623        assert_eq!(m.items_seen, 5);
624        assert_eq!(m.items_dispatched, 2);
625    }
626
627    #[test]
628    fn tick_reply_handles_empty() {
629        let reply = TickReply::default();
630        let ack = reply.into_tick_ack().unwrap();
631        assert!(ack.next_cursor.is_none());
632        assert!(ack.next_interval_hint.is_none());
633        assert!(ack.metrics.is_none());
634    }
635
636    #[test]
637    fn tick_reply_bad_cursor_b64_errors() {
638        let reply = TickReply {
639            next_cursor: Some("!!not_b64!!".into()),
640            ..TickReply::default()
641        };
642        let err = reply.into_tick_ack().unwrap_err();
643        assert!(matches!(err, PluginPollerForwardError::ParseReply(_)));
644    }
645
646    #[test]
647    fn handle_tick_topic_appends_dot_tick() {
648        let h = handle("rss", &["rss"], "plugin.poller.rss");
649        assert_eq!(h.tick_topic(), "plugin.poller.rss.tick");
650    }
651}