1use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use chrono::{DateTime, Utc};
21use nexo_broker::{AnyBroker, BrokerHandle, Message};
22use nexo_plugin_manifest::poller::PollerLifecycle;
23use nexo_poller::{PollContext, Poller, PollerError, TickAck, TickMetrics};
24use serde::{Deserialize, Serialize};
25use serde_json::{json, Value};
26
27#[derive(Debug, Clone)]
29pub struct PluginPollerHandle {
30 pub plugin_id: String,
31 pub kinds: Vec<String>,
32 pub broker_topic_prefix: String,
33 pub lifecycle: PollerLifecycle,
34 pub max_concurrent_ticks: u32,
35 pub tick_timeout: Duration,
36 pub entrypoint_command: Option<String>,
41}
42
43impl PluginPollerHandle {
44 pub fn tick_topic(&self) -> String {
45 format!("{}.tick", self.broker_topic_prefix)
46 }
47}
48
49#[derive(Debug, Default)]
52pub struct PluginPollerRouter {
53 handles: std::sync::RwLock<Vec<Arc<PluginPollerHandle>>>,
57}
58
59impl PluginPollerRouter {
60 pub fn new() -> Self {
61 Self::default()
62 }
63
64 pub fn register(&self, handle: PluginPollerHandle) -> Result<(), PollerRouteRegistrationError> {
70 let mut all = self.handles.write().expect("router lock poisoned");
71 for existing in all.iter() {
72 if existing.plugin_id == handle.plugin_id {
73 continue;
75 }
76 for k in &handle.kinds {
77 if existing.kinds.iter().any(|ek| ek == k) {
78 return Err(PollerRouteRegistrationError::DuplicateKind {
79 kind: k.clone(),
80 existing_plugin: existing.plugin_id.clone(),
81 new_plugin: handle.plugin_id.clone(),
82 });
83 }
84 }
85 }
86 all.retain(|h| h.plugin_id != handle.plugin_id);
87 all.push(Arc::new(handle));
88 Ok(())
89 }
90
91 pub fn unregister(&self, plugin_id: &str) -> bool {
93 let mut all = self.handles.write().expect("router lock poisoned");
94 let before = all.len();
95 all.retain(|h| h.plugin_id != plugin_id);
96 all.len() != before
97 }
98
99 pub fn handle_for_kind(&self, kind: &str) -> Option<Arc<PluginPollerHandle>> {
102 let all = self.handles.read().expect("router lock poisoned");
103 all.iter()
104 .find(|h| h.kinds.iter().any(|k| k == kind))
105 .cloned()
106 }
107
108 pub fn handles_for_plugin(&self, plugin_id: &str) -> Option<Arc<PluginPollerHandle>> {
111 let all = self.handles.read().expect("router lock poisoned");
112 all.iter().find(|h| h.plugin_id == plugin_id).cloned()
113 }
114
115 pub fn is_empty(&self) -> bool {
118 self.handles
119 .read()
120 .expect("router lock poisoned")
121 .is_empty()
122 }
123
124 pub fn len(&self) -> usize {
126 self.handles.read().expect("router lock poisoned").len()
127 }
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct TickRequest {
134 pub kind: String,
135 pub job_id: String,
136 pub agent_id: String,
137 pub cursor: Option<String>,
140 pub config: Value,
141 pub now: String,
143 pub interval_hint_secs: u64,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize, Default)]
150pub struct TickReply {
151 #[serde(default)]
152 pub next_cursor: Option<String>,
153 #[serde(default)]
154 pub next_interval_secs: Option<u64>,
155 #[serde(default)]
156 pub metrics: Option<TickMetrics>,
157}
158
159impl TickReply {
160 pub fn into_tick_ack(self) -> Result<TickAck, PluginPollerForwardError> {
163 let next_cursor = match self.next_cursor {
164 Some(s) => Some(decode_cursor(&s)?),
165 None => None,
166 };
167 Ok(TickAck {
168 next_cursor,
169 next_interval_hint: self.next_interval_secs.map(Duration::from_secs),
170 metrics: self.metrics,
171 })
172 }
173}
174
175pub fn build_tick_request(
179 kind: &str,
180 job_id: &str,
181 agent_id: &str,
182 cursor: Option<&[u8]>,
183 config: Value,
184 now: DateTime<Utc>,
185 interval_hint: Duration,
186) -> TickRequest {
187 TickRequest {
188 kind: kind.to_string(),
189 job_id: job_id.to_string(),
190 agent_id: agent_id.to_string(),
191 cursor: cursor.map(encode_cursor),
192 config,
193 now: now.to_rfc3339(),
194 interval_hint_secs: interval_hint.as_secs(),
195 }
196}
197
198pub async fn forward_tick(
203 broker: &AnyBroker,
204 handle: &PluginPollerHandle,
205 request: TickRequest,
206) -> Result<TickReply, PluginPollerForwardError> {
207 let topic = handle.tick_topic();
208 let payload = json!({
209 "method": "poll_tick",
210 "params": request,
211 });
212 let msg = Message::new(topic.clone(), payload);
213 let reply = broker
214 .request(&topic, msg, handle.tick_timeout)
215 .await
216 .map_err(|e| PluginPollerForwardError::Broker(e.to_string()))?;
217 serde_json::from_value::<TickReply>(reply.payload).map_err(|e| {
218 PluginPollerForwardError::ParseReply(format!(
219 "plugin {} returned malformed poll_tick reply: {e}",
220 handle.plugin_id
221 ))
222 })
223}
224
225pub struct EphemeralPollerProxy {
244 kind: &'static str,
245 handle: Arc<PluginPollerHandle>,
246}
247
248impl EphemeralPollerProxy {
249 pub fn new(kind: &'static str, handle: Arc<PluginPollerHandle>) -> Self {
253 Self { kind, handle }
254 }
255}
256
257#[async_trait]
258impl Poller for EphemeralPollerProxy {
259 fn kind(&self) -> &'static str {
260 self.kind
261 }
262
263 fn description(&self) -> &'static str {
264 "(plugin v2 subprocess — ephemeral, spawn-per-tick)"
265 }
266
267 async fn tick(&self, ctx: &PollContext) -> Result<TickAck, PollerError> {
268 let command =
269 self.handle
270 .entrypoint_command
271 .as_deref()
272 .ok_or_else(|| PollerError::Config {
273 job: ctx.job_id.clone(),
274 reason: format!(
275 "ephemeral poller '{}' has no [plugin.entrypoint] command",
276 self.handle.plugin_id
277 ),
278 })?;
279 let request = build_tick_request(
280 self.kind,
281 &ctx.job_id,
282 &ctx.agent_id,
283 ctx.cursor.as_deref(),
284 ctx.config.clone(),
285 ctx.now,
286 ctx.interval_hint,
287 );
288 spawn_ephemeral_tick(
289 command,
290 &self.handle.plugin_id,
291 request,
292 self.handle.tick_timeout,
293 ctx.cancel.clone(),
294 )
295 .await
296 }
297}
298
299pub async fn spawn_ephemeral_tick(
302 command: &str,
303 plugin_id: &str,
304 request: TickRequest,
305 timeout: Duration,
306 cancel: tokio_util::sync::CancellationToken,
307) -> Result<TickAck, PollerError> {
308 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
309 use tokio::process::Command;
310
311 let request_json = serde_json::to_string(&json!({
312 "method": "poll_tick",
313 "params": request,
314 }))
315 .map_err(|e| PollerError::Config {
316 job: request.job_id.clone(),
317 reason: format!("serialize TickRequest: {e}"),
318 })?;
319
320 let mut child = Command::new(command)
321 .stdin(std::process::Stdio::piped())
322 .stdout(std::process::Stdio::piped())
323 .stderr(std::process::Stdio::piped())
324 .env("NEXO_POLLER_EPHEMERAL", "1")
325 .env("NEXO_POLLER_PLUGIN_ID", plugin_id)
326 .spawn()
327 .map_err(|e| PollerError::Transient(anyhow::anyhow!("spawn '{command}' failed: {e}")))?;
328
329 {
330 let mut stdin = child.stdin.take().ok_or_else(|| {
331 PollerError::Transient(anyhow::anyhow!("subprocess stdin not captured"))
332 })?;
333 stdin
334 .write_all(request_json.as_bytes())
335 .await
336 .map_err(|e| PollerError::Transient(anyhow::Error::from(e)))?;
337 stdin
338 .write_all(b"\n")
339 .await
340 .map_err(|e| PollerError::Transient(anyhow::Error::from(e)))?;
341 }
343
344 let mut stdout = BufReader::new(
345 child
346 .stdout
347 .take()
348 .ok_or_else(|| PollerError::Transient(anyhow::anyhow!("stdout not captured")))?,
349 );
350 let mut line = String::new();
351 let read = tokio::select! {
352 r = stdout.read_line(&mut line) => r,
353 _ = tokio::time::sleep(timeout) => {
354 let _ = child.kill().await;
355 return Err(PollerError::Transient(anyhow::anyhow!(
356 "ephemeral subprocess exceeded tick_timeout ({timeout:?})"
357 )));
358 }
359 _ = cancel.cancelled() => {
360 let _ = child.kill().await;
361 return Err(PollerError::Transient(anyhow::anyhow!(
362 "ephemeral subprocess cancelled (shutdown or hot-reload)"
363 )));
364 }
365 };
366 read.map_err(|e| PollerError::Transient(anyhow::Error::from(e)))?;
367 let _ = child.wait().await;
368
369 let trimmed = line.trim();
370 if trimmed.is_empty() {
371 return Err(PollerError::Transient(anyhow::anyhow!(
372 "ephemeral subprocess wrote no reply"
373 )));
374 }
375 let envelope: Value = serde_json::from_str(trimmed).map_err(|e| {
376 PollerError::Transient(anyhow::anyhow!(
377 "ephemeral reply parse failed: {e} (line: {trimmed:.200})"
378 ))
379 })?;
380 if let Some(err) = envelope.get("error") {
381 let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(-32603);
382 let message = err
383 .get("message")
384 .and_then(|v| v.as_str())
385 .unwrap_or("subprocess error")
386 .to_string();
387 return Err(match code {
388 -32002 => PollerError::Permanent(anyhow::anyhow!("ephemeral: {message}")),
389 -32602 => PollerError::Config {
390 job: request.job_id.clone(),
391 reason: message,
392 },
393 _ => PollerError::Transient(anyhow::anyhow!("ephemeral rpc {code}: {message}")),
394 });
395 }
396 let result = envelope.get("result").cloned().unwrap_or(Value::Null);
397 let reply: TickReply = serde_json::from_value(result)
398 .map_err(|e| PollerError::Transient(anyhow::anyhow!("ephemeral TickReply parse: {e}")))?;
399 reply
400 .into_tick_ack()
401 .map_err(|e| PollerError::Transient(anyhow::anyhow!("cursor decode: {e}")))
402}
403
404pub struct PluginPollerProxy {
412 kind: &'static str,
413 handle: Arc<PluginPollerHandle>,
414 broker: AnyBroker,
415}
416
417impl PluginPollerProxy {
418 pub fn new(kind: &'static str, handle: Arc<PluginPollerHandle>, broker: AnyBroker) -> Self {
421 Self {
422 kind,
423 handle,
424 broker,
425 }
426 }
427}
428
429#[async_trait]
430impl Poller for PluginPollerProxy {
431 fn kind(&self) -> &'static str {
432 self.kind
433 }
434
435 fn description(&self) -> &'static str {
436 "(plugin v2 subprocess via [plugin.poller])"
437 }
438
439 async fn tick(&self, ctx: &PollContext) -> Result<TickAck, PollerError> {
440 let request = build_tick_request(
441 self.kind,
442 &ctx.job_id,
443 &ctx.agent_id,
444 ctx.cursor.as_deref(),
445 ctx.config.clone(),
446 ctx.now,
447 ctx.interval_hint,
448 );
449
450 let reply = forward_tick(&self.broker, &self.handle, request)
451 .await
452 .map_err(|e| match e {
453 PluginPollerForwardError::Broker(s) => {
454 PollerError::Transient(anyhow::anyhow!("plugin poller broker: {s}"))
455 }
456 PluginPollerForwardError::ParseReply(s) => {
457 PollerError::Transient(anyhow::anyhow!("plugin poller reply parse: {s}"))
458 }
459 })?;
460
461 reply.into_tick_ack().map_err(|e| {
462 PollerError::Transient(anyhow::anyhow!("plugin poller cursor decode: {e}"))
463 })
464 }
465}
466
467pub fn encode_cursor(raw: &[u8]) -> String {
468 use base64::Engine;
469 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(raw)
470}
471
472fn decode_cursor(s: &str) -> Result<Vec<u8>, PluginPollerForwardError> {
473 use base64::Engine;
474 base64::engine::general_purpose::URL_SAFE_NO_PAD
475 .decode(s.trim_end_matches('='))
476 .map_err(|e| PluginPollerForwardError::ParseReply(format!("cursor base64: {e}")))
477}
478
479#[derive(Debug, thiserror::Error)]
480pub enum PollerRouteRegistrationError {
481 #[error(
482 "kind `{kind}` already owned by plugin `{existing_plugin}` — `{new_plugin}` cannot register"
483 )]
484 DuplicateKind {
485 kind: String,
486 existing_plugin: String,
487 new_plugin: String,
488 },
489}
490
491#[derive(Debug, thiserror::Error)]
492pub enum PluginPollerForwardError {
493 #[error("broker error: {0}")]
494 Broker(String),
495 #[error("plugin reply parse error: {0}")]
496 ParseReply(String),
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 fn handle(plugin_id: &str, kinds: &[&str], topic: &str) -> PluginPollerHandle {
504 PluginPollerHandle {
505 plugin_id: plugin_id.into(),
506 kinds: kinds.iter().map(|k| (*k).into()).collect(),
507 broker_topic_prefix: topic.into(),
508 lifecycle: PollerLifecycle::LongLived,
509 max_concurrent_ticks: 1,
510 tick_timeout: Duration::from_secs(60),
511 entrypoint_command: None,
512 }
513 }
514
515 #[test]
516 fn register_and_lookup_single_kind() {
517 let r = PluginPollerRouter::new();
518 r.register(handle("gcal", &["google_calendar"], "plugin.poller.gcal"))
519 .unwrap();
520 let h = r.handle_for_kind("google_calendar").expect("found");
521 assert_eq!(h.plugin_id, "gcal");
522 assert!(r.handle_for_kind("unknown").is_none());
523 }
524
525 #[test]
526 fn register_one_plugin_with_multiple_kinds() {
527 let r = PluginPollerRouter::new();
528 r.register(handle(
529 "google",
530 &["gmail", "google_calendar"],
531 "plugin.google",
532 ))
533 .unwrap();
534 assert!(r.handle_for_kind("gmail").is_some());
535 assert!(r.handle_for_kind("google_calendar").is_some());
536 assert_eq!(r.len(), 1);
537 }
538
539 #[test]
540 fn register_rejects_duplicate_kind_across_plugins() {
541 let r = PluginPollerRouter::new();
542 r.register(handle("gcal_a", &["google_calendar"], "plugin.a"))
543 .unwrap();
544 let err = r
545 .register(handle("gcal_b", &["google_calendar"], "plugin.b"))
546 .expect_err("dup kind rejected");
547 match err {
548 PollerRouteRegistrationError::DuplicateKind { kind, .. } => {
549 assert_eq!(kind, "google_calendar");
550 }
551 }
552 }
553
554 #[test]
555 fn register_same_plugin_id_replaces_previous() {
556 let r = PluginPollerRouter::new();
557 r.register(handle("gcal", &["google_calendar"], "plugin.poller.v1"))
558 .unwrap();
559 r.register(handle("gcal", &["google_calendar"], "plugin.poller.v2"))
560 .expect("replace allowed");
561 assert_eq!(r.len(), 1);
562 let h = r.handle_for_kind("google_calendar").unwrap();
563 assert_eq!(h.broker_topic_prefix, "plugin.poller.v2");
564 }
565
566 #[test]
567 fn unregister_drops_handle() {
568 let r = PluginPollerRouter::new();
569 r.register(handle("rss", &["rss"], "plugin.poller.rss"))
570 .unwrap();
571 assert!(r.unregister("rss"));
572 assert!(r.is_empty());
573 assert!(!r.unregister("rss"));
574 }
575
576 #[test]
577 fn build_tick_request_serializes_cursor_b64() {
578 let req = build_tick_request(
579 "rss",
580 "job-1",
581 "ana",
582 Some(b"hello"),
583 json!({"k": "v"}),
584 DateTime::parse_from_rfc3339("2026-05-17T10:00:00Z")
585 .unwrap()
586 .with_timezone(&Utc),
587 Duration::from_secs(300),
588 );
589 assert_eq!(req.kind, "rss");
590 assert_eq!(req.cursor.as_deref(), Some("aGVsbG8"));
591 assert_eq!(req.now, "2026-05-17T10:00:00+00:00");
592 assert_eq!(req.interval_hint_secs, 300);
593 }
594
595 #[test]
596 fn build_tick_request_omits_cursor_when_none() {
597 let req = build_tick_request(
598 "rss",
599 "job-1",
600 "ana",
601 None,
602 Value::Null,
603 Utc::now(),
604 Duration::from_secs(60),
605 );
606 assert!(req.cursor.is_none());
607 }
608
609 #[test]
610 fn tick_reply_decodes_cursor_round_trip() {
611 let reply = TickReply {
612 next_cursor: Some(encode_cursor(b"world")),
613 next_interval_secs: Some(120),
614 metrics: Some(TickMetrics {
615 items_seen: 5,
616 items_dispatched: 2,
617 }),
618 };
619 let ack = reply.into_tick_ack().unwrap();
620 assert_eq!(ack.next_cursor.as_deref(), Some(b"world".as_slice()));
621 assert_eq!(ack.next_interval_hint, Some(Duration::from_secs(120)));
622 let m = ack.metrics.unwrap();
623 assert_eq!(m.items_seen, 5);
624 assert_eq!(m.items_dispatched, 2);
625 }
626
627 #[test]
628 fn tick_reply_handles_empty() {
629 let reply = TickReply::default();
630 let ack = reply.into_tick_ack().unwrap();
631 assert!(ack.next_cursor.is_none());
632 assert!(ack.next_interval_hint.is_none());
633 assert!(ack.metrics.is_none());
634 }
635
636 #[test]
637 fn tick_reply_bad_cursor_b64_errors() {
638 let reply = TickReply {
639 next_cursor: Some("!!not_b64!!".into()),
640 ..TickReply::default()
641 };
642 let err = reply.into_tick_ack().unwrap_err();
643 assert!(matches!(err, PluginPollerForwardError::ParseReply(_)));
644 }
645
646 #[test]
647 fn handle_tick_topic_appends_dot_tick() {
648 let h = handle("rss", &["rss"], "plugin.poller.rss");
649 assert_eq!(h.tick_topic(), "plugin.poller.rss.tick");
650 }
651}