use async_trait::async_trait;
use serde::Deserialize;
use serde_json::{json, Value};
use nexo_auth::handle::{Channel, GOOGLE, TELEGRAM, WHATSAPP};
use nexo_llm::{ChatMessage, ChatRequest, ChatRole, ResponseContent};
use crate::error::PollerError;
use crate::poller::{OutboundDelivery, PollContext, Poller, TickOutcome};
#[derive(Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct AgentTurnJobConfig {
pub llm: LlmRef,
pub system_prompt: Option<String>,
pub user_prompt: String,
pub deliver: DeliverCfg,
#[serde(default)]
pub language: Option<String>,
#[serde(default)]
pub max_tokens: Option<u32>,
}
#[derive(Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct LlmRef {
pub provider: String,
pub model: String,
}
#[derive(Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct DeliverCfg {
pub channel: String,
pub recipient: String,
}
pub struct AgentTurnPoller;
impl AgentTurnPoller {
pub fn new() -> Self {
Self
}
}
impl Default for AgentTurnPoller {
fn default() -> Self {
Self::new()
}
}
fn parse_channel(s: &str) -> Result<Channel, PollerError> {
match s.trim().to_ascii_lowercase().as_str() {
"whatsapp" => Ok(WHATSAPP),
"telegram" => Ok(TELEGRAM),
"google" => Ok(GOOGLE),
other => Err(PollerError::Permanent(anyhow::anyhow!(
"agent_turn: unknown channel '{other}' (supported: whatsapp, telegram, google)"
))),
}
}
fn validate_lang(raw: &str) -> Option<String> {
let cleaned: String = raw
.chars()
.filter(|c| !c.is_control() && *c != '\n' && *c != '\r')
.collect();
let trimmed = cleaned.trim();
if trimmed.is_empty() {
return None;
}
Some(trimmed.chars().take(64).collect())
}
#[async_trait]
impl Poller for AgentTurnPoller {
fn kind(&self) -> &'static str {
"agent_turn"
}
fn description(&self) -> &'static str {
"Runs an LLM turn on a schedule and dispatches the reply to a configured channel. \
Cron-style replacement for hand-coded modules that just want 'fire prompt at time T'."
}
fn validate(&self, config: &Value) -> Result<(), PollerError> {
let cfg: AgentTurnJobConfig =
serde_json::from_value(config.clone()).map_err(|e| PollerError::Config {
job: "<agent_turn>".into(),
reason: e.to_string(),
})?;
parse_channel(&cfg.deliver.channel)?;
if cfg.user_prompt.trim().is_empty() {
return Err(PollerError::Config {
job: "<agent_turn>".into(),
reason: "user_prompt must be non-empty".into(),
});
}
Ok(())
}
async fn tick(&self, ctx: &PollContext) -> Result<TickOutcome, PollerError> {
let cfg: AgentTurnJobConfig =
serde_json::from_value(ctx.config.clone()).map_err(|e| PollerError::Config {
job: ctx.job_id.clone(),
reason: e.to_string(),
})?;
let registry = ctx
.llm_registry
.as_ref()
.ok_or_else(|| PollerError::Config {
job: ctx.job_id.clone(),
reason: "agent_turn requires the runner to be wired with with_llm(...) at boot"
.into(),
})?;
let llm_config = ctx.llm_config.as_ref().ok_or_else(|| PollerError::Config {
job: ctx.job_id.clone(),
reason: "agent_turn requires LlmConfig — wire with_llm(registry, config)".into(),
})?;
let model_cfg = nexo_config::types::agents::ModelConfig {
provider: cfg.llm.provider.clone(),
model: cfg.llm.model.clone(),
};
let client = registry
.build(llm_config, &model_cfg)
.map_err(|e| PollerError::Config {
job: ctx.job_id.clone(),
reason: format!("LLM client build: {e}"),
})?;
let mut messages: Vec<ChatMessage> = Vec::new();
if let Some(sp) = cfg.system_prompt.as_deref() {
let sp = sp.trim();
if !sp.is_empty() {
messages.push(ChatMessage {
role: ChatRole::System,
content: sp.to_string(),
attachments: Vec::new(),
tool_call_id: None,
name: None,
tool_calls: Vec::new(),
});
}
}
if let Some(lang) = cfg.language.as_deref().and_then(validate_lang) {
messages.push(ChatMessage {
role: ChatRole::System,
content: format!(
"# OUTPUT LANGUAGE\n\nRespond in {lang}. Workspace docs and tool \
descriptions are in English; reply to the user in {lang}."
),
attachments: Vec::new(),
tool_call_id: None,
name: None,
tool_calls: Vec::new(),
});
}
messages.push(ChatMessage {
role: ChatRole::User,
content: cfg.user_prompt.clone(),
attachments: Vec::new(),
tool_call_id: None,
name: None,
tool_calls: Vec::new(),
});
let mut req = ChatRequest::new(&cfg.llm.model, messages);
if let Some(mt) = cfg.max_tokens {
req.max_tokens = mt;
}
let response = tokio::select! {
r = client.chat(req) => r,
_ = ctx.cancel.cancelled() => {
return Ok(TickOutcome::default());
}
}
.map_err(|e| {
let msg = e.to_string();
let is_perm = msg.contains("401")
|| msg.contains("403")
|| msg.contains("not registered")
|| msg.contains("not present in config.providers");
if is_perm {
PollerError::Permanent(e)
} else {
PollerError::Transient(e)
}
})?;
let text = match response.content {
ResponseContent::Text(t) => t,
ResponseContent::ToolCalls(_) => {
return Err(PollerError::Permanent(anyhow::anyhow!(
"agent_turn does not support tool calls — the LLM returned tool_use; \
remove tools from the prompt or use a real agent for this workflow"
)));
}
};
let trimmed = text.trim();
if trimmed.is_empty() {
tracing::warn!(
job = %ctx.job_id,
"agent_turn produced empty text — skipping delivery"
);
return Ok(TickOutcome {
items_seen: 1,
items_dispatched: 0,
deliver: Vec::new(),
next_cursor: None,
next_interval_hint: None,
});
}
let channel = parse_channel(&cfg.deliver.channel)?;
let payload = json!({
"kind": "text",
"to": cfg.deliver.recipient,
"text": trimmed,
});
Ok(TickOutcome {
items_seen: 1,
items_dispatched: 1,
deliver: vec![OutboundDelivery {
channel,
recipient: cfg.deliver.recipient.clone(),
payload,
}],
next_cursor: None,
next_interval_hint: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn validate_accepts_minimal_config() {
let cfg = json!({
"llm": { "provider": "anthropic", "model": "claude-haiku-4-5" },
"user_prompt": "summarise the day",
"deliver": { "channel": "telegram", "recipient": "-100" }
});
AgentTurnPoller::new()
.validate(&cfg)
.expect("minimal config validates");
}
#[test]
fn validate_rejects_unknown_channel() {
let cfg = json!({
"llm": { "provider": "anthropic", "model": "x" },
"user_prompt": "p",
"deliver": { "channel": "smtp", "recipient": "a" }
});
let err = AgentTurnPoller::new().validate(&cfg).unwrap_err();
assert!(err.to_string().contains("smtp"));
}
#[test]
fn validate_rejects_empty_user_prompt() {
let cfg = json!({
"llm": { "provider": "anthropic", "model": "x" },
"user_prompt": " ",
"deliver": { "channel": "telegram", "recipient": "a" }
});
let err = AgentTurnPoller::new().validate(&cfg).unwrap_err();
assert!(err.to_string().contains("user_prompt"));
}
#[test]
fn validate_rejects_unknown_field() {
let cfg = json!({
"llm": { "provider": "anthropic", "model": "x" },
"user_prompt": "p",
"deliver": { "channel": "telegram", "recipient": "a" },
"bogus_key": 1
});
AgentTurnPoller::new()
.validate(&cfg)
.expect_err("deny_unknown_fields rejects typos");
}
#[test]
fn lang_sanitiser_strips_newlines() {
assert_eq!(validate_lang("es\nbomb"), Some("esbomb".to_string()));
assert_eq!(validate_lang(" "), None);
assert_eq!(validate_lang(""), None);
}
}