Skip to main content

nexo_poller_tools/
lib.rs

1//! LLM-callable tools for the poller subsystem.
2//!
3//! Lives outside `nexo-core` so the dependency graph stays acyclic
4//! (core → poller → plugin-google → core would loop). `main.rs`
5//! pulls this crate alongside `nexo-core` and registers the tools
6//! per agent.
7//!
8//! Six tools, all read + control on already-declared jobs:
9//!  - `pollers_list`    list every job + state
10//!  - `pollers_show`    detail for one job
11//!  - `pollers_run`     manual tick (bypasses schedule + lease)
12//!  - `pollers_pause`   set paused = 1
13//!  - `pollers_resume`  set paused = 0
14//!  - `pollers_reset`   wipe cursor / consecutive_errors
15//!
16//! Create / delete are intentionally not exposed: a prompt-injection
17//! could plant a `webhook_poll` job aimed at internal infra. Operators
18//! still own `pollers.yaml` + `agent pollers reload`.
19
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use nexo_core::agent::context::AgentContext;
24use nexo_core::agent::tool_registry::{ToolHandler, ToolRegistry};
25use nexo_llm::ToolDef;
26use nexo_poller::PollerRunner;
27use serde_json::{json, Value};
28
29pub struct PollersListTool {
30    runner: Arc<PollerRunner>,
31}
32impl PollersListTool {
33    pub fn new(runner: Arc<PollerRunner>) -> Self {
34        Self { runner }
35    }
36    pub fn tool_def() -> ToolDef {
37        ToolDef {
38            name: "pollers_list".to_string(),
39            description:
40                "List every configured poll job (gmail, rss, calendar, …) with its kind, agent owner, paused flag, last status and counters."
41                    .into(),
42            parameters: json!({ "type": "object", "properties": {} }),
43        }
44    }
45}
46#[async_trait]
47impl ToolHandler for PollersListTool {
48    async fn call(&self, _ctx: &AgentContext, _args: Value) -> anyhow::Result<Value> {
49        let jobs = self.runner.list_jobs().await?;
50        Ok(serde_json::to_value(&jobs)?)
51    }
52}
53
54pub struct PollersShowTool {
55    runner: Arc<PollerRunner>,
56}
57impl PollersShowTool {
58    pub fn new(runner: Arc<PollerRunner>) -> Self {
59        Self { runner }
60    }
61    pub fn tool_def() -> ToolDef {
62        ToolDef {
63            name: "pollers_show".to_string(),
64            description: "Inspect a single poll job by id.".into(),
65            parameters: json!({
66                "type": "object",
67                "properties": {
68                    "id": { "type": "string", "description": "Job id (matches pollers.yaml)" }
69                },
70                "required": ["id"]
71            }),
72        }
73    }
74}
75#[async_trait]
76impl ToolHandler for PollersShowTool {
77    async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
78        let id = args["id"]
79            .as_str()
80            .ok_or_else(|| anyhow::anyhow!("pollers_show requires `id`"))?;
81        let jobs = self.runner.list_jobs().await?;
82        let job = jobs
83            .into_iter()
84            .find(|j| j.id == id)
85            .ok_or_else(|| anyhow::anyhow!("unknown poll job '{id}'"))?;
86        Ok(serde_json::to_value(&job)?)
87    }
88}
89
90pub struct PollersRunTool {
91    runner: Arc<PollerRunner>,
92}
93impl PollersRunTool {
94    pub fn new(runner: Arc<PollerRunner>) -> Self {
95        Self { runner }
96    }
97    pub fn tool_def() -> ToolDef {
98        ToolDef {
99            name: "pollers_run".to_string(),
100            description:
101                "Trigger one tick of a poll job out-of-band (bypasses schedule + lease). Returns items_seen / items_dispatched / deliveries."
102                    .into(),
103            parameters: json!({
104                "type": "object",
105                "properties": {
106                    "id": { "type": "string", "description": "Job id" }
107                },
108                "required": ["id"]
109            }),
110        }
111    }
112}
113#[async_trait]
114impl ToolHandler for PollersRunTool {
115    async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
116        let id = args["id"]
117            .as_str()
118            .ok_or_else(|| anyhow::anyhow!("pollers_run requires `id`"))?;
119        let ack = self.runner.run_once(id).await?;
120        let metrics = ack.metrics.unwrap_or_default();
121        Ok(json!({
122            "ok": true,
123            "items_seen": metrics.items_seen,
124            "items_dispatched": metrics.items_dispatched,
125        }))
126    }
127}
128
129pub struct PollersPauseTool {
130    runner: Arc<PollerRunner>,
131}
132impl PollersPauseTool {
133    pub fn new(runner: Arc<PollerRunner>) -> Self {
134        Self { runner }
135    }
136    pub fn tool_def() -> ToolDef {
137        ToolDef {
138            name: "pollers_pause".to_string(),
139            description:
140                "Pause a poll job. The schedule stops firing until pollers_resume is called.".into(),
141            parameters: json!({
142                "type": "object",
143                "properties": {
144                    "id": { "type": "string", "description": "Job id" }
145                },
146                "required": ["id"]
147            }),
148        }
149    }
150}
151#[async_trait]
152impl ToolHandler for PollersPauseTool {
153    async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
154        let id = args["id"]
155            .as_str()
156            .ok_or_else(|| anyhow::anyhow!("pollers_pause requires `id`"))?;
157        self.runner.set_paused(id, true).await?;
158        Ok(json!({"ok": true, "paused": true}))
159    }
160}
161
162pub struct PollersResumeTool {
163    runner: Arc<PollerRunner>,
164}
165impl PollersResumeTool {
166    pub fn new(runner: Arc<PollerRunner>) -> Self {
167        Self { runner }
168    }
169    pub fn tool_def() -> ToolDef {
170        ToolDef {
171            name: "pollers_resume".to_string(),
172            description: "Resume a paused poll job.".into(),
173            parameters: json!({
174                "type": "object",
175                "properties": {
176                    "id": { "type": "string", "description": "Job id" }
177                },
178                "required": ["id"]
179            }),
180        }
181    }
182}
183#[async_trait]
184impl ToolHandler for PollersResumeTool {
185    async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
186        let id = args["id"]
187            .as_str()
188            .ok_or_else(|| anyhow::anyhow!("pollers_resume requires `id`"))?;
189        self.runner.set_paused(id, false).await?;
190        Ok(json!({"ok": true, "paused": false}))
191    }
192}
193
194pub struct PollersResetTool {
195    runner: Arc<PollerRunner>,
196}
197impl PollersResetTool {
198    pub fn new(runner: Arc<PollerRunner>) -> Self {
199        Self { runner }
200    }
201    pub fn tool_def() -> ToolDef {
202        ToolDef {
203            name: "pollers_reset".to_string(),
204            description:
205                "Reset the cursor and error state of a poll job. Destructive: the next tick re-baselines (gmail will scan from `newer_than`, calendar will fetch a fresh syncToken). Confirm intent before calling."
206                    .into(),
207            parameters: json!({
208                "type": "object",
209                "properties": {
210                    "id": { "type": "string", "description": "Job id" }
211                },
212                "required": ["id"]
213            }),
214        }
215    }
216}
217#[async_trait]
218impl ToolHandler for PollersResetTool {
219    async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
220        let id = args["id"]
221            .as_str()
222            .ok_or_else(|| anyhow::anyhow!("pollers_reset requires `id`"))?;
223        self.runner.reset_cursor(id).await?;
224        Ok(json!({"ok": true, "reset": true}))
225    }
226}
227
228/// Adapter wrapping a `nexo_poller::CustomToolHandler` into the
229/// `nexo_core::ToolHandler` shape. Captures the runner so each
230/// LLM call gets a fresh handle.
231struct CustomToolAdapter {
232    runner: Arc<PollerRunner>,
233    inner: Arc<dyn nexo_poller::CustomToolHandler>,
234}
235#[async_trait]
236impl ToolHandler for CustomToolAdapter {
237    async fn call(&self, ctx: &AgentContext, mut args: Value) -> anyhow::Result<Value> {
238        // Inject the calling agent's id so handlers can resolve
239        // per-agent credentials without trusting LLM-supplied args.
240        // The handler reads `_agent_id` from the value object;
241        // arbitrary args from the LLM cannot override it because
242        // we set after the fact.
243        if let Value::Object(map) = &mut args {
244            map.insert("_agent_id".to_string(), Value::String(ctx.agent_id.clone()));
245        }
246        self.inner.call(Arc::clone(&self.runner), args).await
247    }
248}
249
250/// Wire the six generic `pollers_*` tools plus every per-kind custom
251/// tool exposed by the registered `Poller` impls. Called from `main.rs`
252/// per agent.
253pub fn register_all(registry: &ToolRegistry, runner: Arc<PollerRunner>) {
254    registry.register(
255        PollersListTool::tool_def(),
256        PollersListTool::new(runner.clone()),
257    );
258    registry.register(
259        PollersShowTool::tool_def(),
260        PollersShowTool::new(runner.clone()),
261    );
262    registry.register(
263        PollersRunTool::tool_def(),
264        PollersRunTool::new(runner.clone()),
265    );
266    registry.register(
267        PollersPauseTool::tool_def(),
268        PollersPauseTool::new(runner.clone()),
269    );
270    registry.register(
271        PollersResumeTool::tool_def(),
272        PollersResumeTool::new(runner.clone()),
273    );
274    registry.register(
275        PollersResetTool::tool_def(),
276        PollersResetTool::new(runner.clone()),
277    );
278
279    // Per-kind custom tools — each registered Poller impl can return
280    // a Vec<CustomToolSpec>. Empty by default.
281    for spec in runner.collect_custom_tools() {
282        registry.register(
283            spec.def,
284            CustomToolAdapter {
285                runner: Arc::clone(&runner),
286                inner: spec.handler,
287            },
288        );
289    }
290}