use std::sync::Arc;
use async_trait::async_trait;
use nexo_core::agent::context::AgentContext;
use nexo_core::agent::tool_registry::{ToolHandler, ToolRegistry};
use nexo_llm::ToolDef;
use nexo_poller::PollerRunner;
use serde_json::{json, Value};
pub struct PollersListTool {
runner: Arc<PollerRunner>,
}
impl PollersListTool {
pub fn new(runner: Arc<PollerRunner>) -> Self {
Self { runner }
}
pub fn tool_def() -> ToolDef {
ToolDef {
name: "pollers_list".to_string(),
description:
"List every configured poll job (gmail, rss, calendar, …) with its kind, agent owner, paused flag, last status and counters."
.into(),
parameters: json!({ "type": "object", "properties": {} }),
}
}
}
#[async_trait]
impl ToolHandler for PollersListTool {
async fn call(&self, _ctx: &AgentContext, _args: Value) -> anyhow::Result<Value> {
let jobs = self.runner.list_jobs().await?;
Ok(serde_json::to_value(&jobs)?)
}
}
pub struct PollersShowTool {
runner: Arc<PollerRunner>,
}
impl PollersShowTool {
pub fn new(runner: Arc<PollerRunner>) -> Self {
Self { runner }
}
pub fn tool_def() -> ToolDef {
ToolDef {
name: "pollers_show".to_string(),
description: "Inspect a single poll job by id.".into(),
parameters: json!({
"type": "object",
"properties": {
"id": { "type": "string", "description": "Job id (matches pollers.yaml)" }
},
"required": ["id"]
}),
}
}
}
#[async_trait]
impl ToolHandler for PollersShowTool {
async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("pollers_show requires `id`"))?;
let jobs = self.runner.list_jobs().await?;
let job = jobs
.into_iter()
.find(|j| j.id == id)
.ok_or_else(|| anyhow::anyhow!("unknown poll job '{id}'"))?;
Ok(serde_json::to_value(&job)?)
}
}
pub struct PollersRunTool {
runner: Arc<PollerRunner>,
}
impl PollersRunTool {
pub fn new(runner: Arc<PollerRunner>) -> Self {
Self { runner }
}
pub fn tool_def() -> ToolDef {
ToolDef {
name: "pollers_run".to_string(),
description:
"Trigger one tick of a poll job out-of-band (bypasses schedule + lease). Returns items_seen / items_dispatched / deliveries."
.into(),
parameters: json!({
"type": "object",
"properties": {
"id": { "type": "string", "description": "Job id" }
},
"required": ["id"]
}),
}
}
}
#[async_trait]
impl ToolHandler for PollersRunTool {
async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("pollers_run requires `id`"))?;
let ack = self.runner.run_once(id).await?;
let metrics = ack.metrics.unwrap_or_default();
Ok(json!({
"ok": true,
"items_seen": metrics.items_seen,
"items_dispatched": metrics.items_dispatched,
}))
}
}
pub struct PollersPauseTool {
runner: Arc<PollerRunner>,
}
impl PollersPauseTool {
pub fn new(runner: Arc<PollerRunner>) -> Self {
Self { runner }
}
pub fn tool_def() -> ToolDef {
ToolDef {
name: "pollers_pause".to_string(),
description:
"Pause a poll job. The schedule stops firing until pollers_resume is called.".into(),
parameters: json!({
"type": "object",
"properties": {
"id": { "type": "string", "description": "Job id" }
},
"required": ["id"]
}),
}
}
}
#[async_trait]
impl ToolHandler for PollersPauseTool {
async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("pollers_pause requires `id`"))?;
self.runner.set_paused(id, true).await?;
Ok(json!({"ok": true, "paused": true}))
}
}
pub struct PollersResumeTool {
runner: Arc<PollerRunner>,
}
impl PollersResumeTool {
pub fn new(runner: Arc<PollerRunner>) -> Self {
Self { runner }
}
pub fn tool_def() -> ToolDef {
ToolDef {
name: "pollers_resume".to_string(),
description: "Resume a paused poll job.".into(),
parameters: json!({
"type": "object",
"properties": {
"id": { "type": "string", "description": "Job id" }
},
"required": ["id"]
}),
}
}
}
#[async_trait]
impl ToolHandler for PollersResumeTool {
async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("pollers_resume requires `id`"))?;
self.runner.set_paused(id, false).await?;
Ok(json!({"ok": true, "paused": false}))
}
}
pub struct PollersResetTool {
runner: Arc<PollerRunner>,
}
impl PollersResetTool {
pub fn new(runner: Arc<PollerRunner>) -> Self {
Self { runner }
}
pub fn tool_def() -> ToolDef {
ToolDef {
name: "pollers_reset".to_string(),
description:
"Reset the cursor and error state of a poll job. Destructive: the next tick re-baselines (gmail will scan from `newer_than`, calendar will fetch a fresh syncToken). Confirm intent before calling."
.into(),
parameters: json!({
"type": "object",
"properties": {
"id": { "type": "string", "description": "Job id" }
},
"required": ["id"]
}),
}
}
}
#[async_trait]
impl ToolHandler for PollersResetTool {
async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("pollers_reset requires `id`"))?;
self.runner.reset_cursor(id).await?;
Ok(json!({"ok": true, "reset": true}))
}
}
struct CustomToolAdapter {
runner: Arc<PollerRunner>,
inner: Arc<dyn nexo_poller::CustomToolHandler>,
}
#[async_trait]
impl ToolHandler for CustomToolAdapter {
async fn call(&self, ctx: &AgentContext, mut args: Value) -> anyhow::Result<Value> {
if let Value::Object(map) = &mut args {
map.insert("_agent_id".to_string(), Value::String(ctx.agent_id.clone()));
}
self.inner.call(Arc::clone(&self.runner), args).await
}
}
pub fn register_all(registry: &ToolRegistry, runner: Arc<PollerRunner>) {
registry.register(
PollersListTool::tool_def(),
PollersListTool::new(runner.clone()),
);
registry.register(
PollersShowTool::tool_def(),
PollersShowTool::new(runner.clone()),
);
registry.register(
PollersRunTool::tool_def(),
PollersRunTool::new(runner.clone()),
);
registry.register(
PollersPauseTool::tool_def(),
PollersPauseTool::new(runner.clone()),
);
registry.register(
PollersResumeTool::tool_def(),
PollersResumeTool::new(runner.clone()),
);
registry.register(
PollersResetTool::tool_def(),
PollersResetTool::new(runner.clone()),
);
for spec in runner.collect_custom_tools() {
registry.register(
spec.def,
CustomToolAdapter {
runner: Arc::clone(&runner),
inner: spec.handler,
},
);
}
}