use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use base64::Engine;
use nexo_broker::{AnyBroker, BrokerHandle, Event, Message};
use nexo_poller::{
HostError, LlmInvokeRequest, LlmInvokeResponse, LogLevel, PollerError, PollerHost, TickAck,
TickMetrics,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
#[async_trait]
pub trait PollerHandler: Send + Sync + 'static {
async fn tick(
&self,
req: TickRequest,
host: Arc<dyn PollerHost>,
) -> Result<TickAck, PollerError>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TickRequest {
pub kind: String,
pub job_id: String,
pub agent_id: String,
#[serde(default)]
pub cursor: Option<String>,
pub config: Value,
pub now: String,
#[serde(default)]
pub interval_hint_secs: u64,
}
impl TickRequest {
pub fn cursor_bytes(&self) -> Result<Option<Vec<u8>>, PollerError> {
match self.cursor.as_deref() {
None => Ok(None),
Some(s) => base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(s.trim_end_matches('='))
.map(Some)
.map_err(|e| PollerError::Config {
job: self.job_id.clone(),
reason: format!("cursor base64 decode: {e}"),
}),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TickReply {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub next_cursor: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub next_interval_secs: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metrics: Option<TickMetrics>,
}
impl TickReply {
pub fn from_tick_ack(ack: TickAck) -> Self {
Self {
next_cursor: ack
.next_cursor
.as_deref()
.map(|b| base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b)),
next_interval_secs: ack.next_interval_hint.map(|d| d.as_secs()),
metrics: ack.metrics,
}
}
}
pub struct BrokerPollerHost {
plugin_id: String,
agent_id: String,
job_id: String,
broker: AnyBroker,
reverse_rpc_timeout: Duration,
}
impl BrokerPollerHost {
pub fn new(
plugin_id: impl Into<String>,
agent_id: impl Into<String>,
job_id: impl Into<String>,
broker: AnyBroker,
) -> Self {
Self {
plugin_id: plugin_id.into(),
agent_id: agent_id.into(),
job_id: job_id.into(),
broker,
reverse_rpc_timeout: Duration::from_secs(10),
}
}
pub fn with_reverse_rpc_timeout(mut self, t: Duration) -> Self {
self.reverse_rpc_timeout = t;
self
}
fn rpc_topic(&self) -> String {
format!("daemon.rpc.{}", self.plugin_id)
}
async fn rpc_call(&self, method: &str, params: Value) -> Result<Value, HostError> {
let topic = self.rpc_topic();
let payload = json!({
"method": method,
"params": params,
"agent_id": self.agent_id,
"job_id": self.job_id,
});
let msg = Message::new(topic.clone(), payload);
let reply = self
.broker
.request(&topic, msg, self.reverse_rpc_timeout)
.await
.map_err(|e| HostError::BrokerUnavailable(e.to_string()))?;
if let Some(error) = reply.payload.get("error") {
let code = error
.get("code")
.and_then(|v| v.as_i64())
.map(|n| n as i32)
.unwrap_or(-32603);
let message = error
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("rpc error")
.to_string();
return Err(HostError::Rpc { code, message });
}
Ok(reply.payload.get("result").cloned().unwrap_or(Value::Null))
}
}
#[async_trait]
impl PollerHost for BrokerPollerHost {
async fn broker_publish(&self, topic: String, payload: Vec<u8>) -> Result<(), HostError> {
let value: Value = serde_json::from_slice(&payload).unwrap_or(Value::Null);
let event = Event::new(&topic, "plugin.poller", value);
self.broker
.publish(&topic, event)
.await
.map_err(|e| HostError::BrokerUnavailable(e.to_string()))
}
async fn credentials_get(&self, channel: String) -> Result<Value, HostError> {
self.rpc_call(
"credentials_get",
json!({ "channel": channel, "agent_id": self.agent_id }),
)
.await
}
async fn log(&self, level: LogLevel, message: String, fields: Value) -> Result<(), HostError> {
self.rpc_call(
"log",
json!({
"level": level,
"message": message,
"fields": fields,
}),
)
.await
.map(|_| ())
}
async fn metric_inc(&self, name: String, labels: Value) -> Result<(), HostError> {
self.rpc_call("metric_inc", json!({ "name": name, "labels": labels }))
.await
.map(|_| ())
}
async fn llm_invoke(&self, request: LlmInvokeRequest) -> Result<LlmInvokeResponse, HostError> {
let reply = self
.rpc_call(
"llm_invoke",
serde_json::to_value(request).unwrap_or(Value::Null),
)
.await?;
serde_json::from_value::<LlmInvokeResponse>(reply).map_err(|e| HostError::Other(e.into()))
}
}
pub async fn serve_one_tick(
plugin_id: &str,
broker: AnyBroker,
handler: Arc<dyn PollerHandler>,
request_payload: Value,
reply_to: Option<&str>,
) -> Result<(), ServeError> {
let reply_topic = reply_to.ok_or(ServeError::MissingReplyTo)?.to_string();
let request_envelope = request_payload
.get("params")
.cloned()
.unwrap_or(request_payload.clone());
let request: TickRequest = match serde_json::from_value(request_envelope) {
Ok(r) => r,
Err(e) => {
let err = json!({
"error": { "code": -32602, "message": format!("malformed TickRequest: {e}") }
});
let _ = broker
.publish(&reply_topic, Event::new(&reply_topic, "plugin.poller", err))
.await;
return Ok(());
}
};
let host = Arc::new(BrokerPollerHost::new(
plugin_id,
request.agent_id.clone(),
request.job_id.clone(),
broker.clone(),
)) as Arc<dyn PollerHost>;
let payload = match handler.tick(request, host).await {
Ok(ack) => {
let reply = TickReply::from_tick_ack(ack);
json!({ "result": reply })
}
Err(e) => {
let (code, message) = poller_error_to_rpc(e);
json!({ "error": { "code": code, "message": message } })
}
};
let _ = broker
.publish(
&reply_topic,
Event::new(&reply_topic, "plugin.poller", payload),
)
.await;
Ok(())
}
fn poller_error_to_rpc(err: PollerError) -> (i32, String) {
use nexo_poller::error::ErrorClass;
let msg = err.to_string();
let code = match err.classify() {
ErrorClass::Transient => -32001,
ErrorClass::Permanent => -32002,
ErrorClass::Config => -32602,
};
(code, msg)
}
#[derive(Debug, thiserror::Error)]
pub enum ServeError {
#[error("incoming tick request has no reply_to topic; cannot respond")]
MissingReplyTo,
}
pub struct EphemeralHost {
plugin_id: String,
job_id: String,
agent_id: String,
broker: Option<nexo_broker::AnyBroker>,
}
impl EphemeralHost {
pub fn new(
plugin_id: impl Into<String>,
job_id: impl Into<String>,
agent_id: impl Into<String>,
) -> Self {
Self {
plugin_id: plugin_id.into(),
job_id: job_id.into(),
agent_id: agent_id.into(),
broker: None,
}
}
pub fn with_broker(mut self, broker: nexo_broker::AnyBroker) -> Self {
self.broker = Some(broker);
self
}
}
#[async_trait]
impl PollerHost for EphemeralHost {
async fn broker_publish(&self, topic: String, payload: Vec<u8>) -> Result<(), HostError> {
let broker = self.broker.as_ref().ok_or_else(|| HostError::Rpc {
code: -32601,
message:
"EphemeralHost has no broker — set NEXO_BROKER_URL + attach via with_broker(...)"
.into(),
})?;
let value: Value = serde_json::from_slice(&payload).unwrap_or(Value::Null);
let event = nexo_broker::Event::new(&topic, "plugin.poller.ephemeral", value);
use nexo_broker::BrokerHandle;
broker
.publish(&topic, event)
.await
.map_err(|e| HostError::BrokerUnavailable(e.to_string()))
}
async fn credentials_get(&self, channel: String) -> Result<Value, HostError> {
Err(HostError::Rpc {
code: -32601,
message: format!(
"credentials_get('{channel}') unavailable in ephemeral lifecycle — switch to long_lived"
),
})
}
async fn log(&self, level: LogLevel, message: String, fields: Value) -> Result<(), HostError> {
match level {
LogLevel::Trace => {
tracing::trace!(plugin = %self.plugin_id, job_id = %self.job_id, agent_id = %self.agent_id, %message, ?fields, "ephemeral poller log")
}
LogLevel::Debug => {
tracing::debug!(plugin = %self.plugin_id, job_id = %self.job_id, agent_id = %self.agent_id, %message, ?fields, "ephemeral poller log")
}
LogLevel::Info => {
tracing::info!(plugin = %self.plugin_id, job_id = %self.job_id, agent_id = %self.agent_id, %message, ?fields, "ephemeral poller log")
}
LogLevel::Warn => {
tracing::warn!(plugin = %self.plugin_id, job_id = %self.job_id, agent_id = %self.agent_id, %message, ?fields, "ephemeral poller log")
}
LogLevel::Error => {
tracing::error!(plugin = %self.plugin_id, job_id = %self.job_id, agent_id = %self.agent_id, %message, ?fields, "ephemeral poller log")
}
}
Ok(())
}
async fn metric_inc(&self, name: String, labels: Value) -> Result<(), HostError> {
tracing::info!(
metric = %name,
plugin = %self.plugin_id,
job_id = %self.job_id,
?labels,
"ephemeral poller metric_inc",
);
Ok(())
}
async fn llm_invoke(&self, _request: LlmInvokeRequest) -> Result<LlmInvokeResponse, HostError> {
Err(HostError::Rpc {
code: -32601,
message: "llm_invoke unavailable in ephemeral lifecycle — switch to long_lived".into(),
})
}
}
pub async fn serve_one_ephemeral_tick(
plugin_id: &str,
handler: Arc<dyn PollerHandler>,
broker: Option<nexo_broker::AnyBroker>,
) -> Result<(), EphemeralError> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin);
let mut line = String::new();
let n = reader
.read_line(&mut line)
.await
.map_err(|e| EphemeralError::Io(e.to_string()))?;
if n == 0 {
return Err(EphemeralError::StdinClosed);
}
let envelope: Value = serde_json::from_str(line.trim())
.map_err(|e| EphemeralError::ParseEnvelope(e.to_string()))?;
let params = envelope.get("params").cloned().unwrap_or(Value::Null);
let request: TickRequest = serde_json::from_value(params)
.map_err(|e| EphemeralError::ParseEnvelope(format!("TickRequest: {e}")))?;
let mut host = EphemeralHost::new(plugin_id, request.job_id.clone(), request.agent_id.clone());
if let Some(b) = broker {
host = host.with_broker(b);
}
let host_arc: Arc<dyn PollerHost> = Arc::new(host);
let reply_envelope = match handler.tick(request, host_arc).await {
Ok(ack) => {
let reply = TickReply::from_tick_ack(ack);
json!({ "result": reply })
}
Err(e) => {
let (code, message) = poller_error_to_rpc(e);
json!({ "error": { "code": code, "message": message } })
}
};
let mut stdout = tokio::io::stdout();
let bytes = serde_json::to_vec(&reply_envelope)
.map_err(|e| EphemeralError::SerializeReply(e.to_string()))?;
stdout
.write_all(&bytes)
.await
.map_err(|e| EphemeralError::Io(e.to_string()))?;
stdout
.write_all(b"\n")
.await
.map_err(|e| EphemeralError::Io(e.to_string()))?;
stdout
.flush()
.await
.map_err(|e| EphemeralError::Io(e.to_string()))?;
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum EphemeralError {
#[error("stdin closed before TickRequest arrived")]
StdinClosed,
#[error("envelope parse: {0}")]
ParseEnvelope(String),
#[error("reply serialize: {0}")]
SerializeReply(String),
#[error("io: {0}")]
Io(String),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tick_request_cursor_bytes_round_trip() {
let raw = b"hello world";
let b64 = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(raw);
let req = TickRequest {
kind: "rss".into(),
job_id: "j1".into(),
agent_id: "ana".into(),
cursor: Some(b64),
config: Value::Null,
now: "2026-05-17T10:00:00Z".into(),
interval_hint_secs: 60,
};
let bytes = req.cursor_bytes().unwrap().unwrap();
assert_eq!(bytes, raw);
}
#[test]
fn tick_request_cursor_none() {
let req = TickRequest {
kind: "rss".into(),
job_id: "j1".into(),
agent_id: "ana".into(),
cursor: None,
config: Value::Null,
now: "2026-05-17T10:00:00Z".into(),
interval_hint_secs: 0,
};
assert!(req.cursor_bytes().unwrap().is_none());
}
#[test]
fn tick_request_bad_cursor_errors() {
let req = TickRequest {
kind: "rss".into(),
job_id: "j1".into(),
agent_id: "ana".into(),
cursor: Some("!!not_b64!!".into()),
config: Value::Null,
now: "2026-05-17T10:00:00Z".into(),
interval_hint_secs: 0,
};
let err = req.cursor_bytes().unwrap_err();
assert!(matches!(err, PollerError::Config { .. }));
}
#[test]
fn tick_reply_from_tick_ack_encodes_cursor() {
let ack = TickAck {
next_cursor: Some(b"world".to_vec()),
next_interval_hint: Some(Duration::from_secs(120)),
metrics: Some(TickMetrics {
items_seen: 3,
items_dispatched: 1,
}),
};
let reply = TickReply::from_tick_ack(ack);
assert_eq!(reply.next_cursor.as_deref(), Some("d29ybGQ"));
assert_eq!(reply.next_interval_secs, Some(120));
let m = reply.metrics.unwrap();
assert_eq!(m.items_seen, 3);
}
#[test]
fn poller_error_classification_into_rpc_code() {
let (c, _) = poller_error_to_rpc(PollerError::Transient(anyhow::anyhow!("503")));
assert_eq!(c, -32001);
let (c, _) = poller_error_to_rpc(PollerError::Permanent(anyhow::anyhow!("revoked")));
assert_eq!(c, -32002);
let (c, _) = poller_error_to_rpc(PollerError::Config {
job: "x".into(),
reason: "y".into(),
});
assert_eq!(c, -32602);
}
}