1use std::sync::Arc;
21
22use async_trait::async_trait;
23use nexo_core::agent::context::AgentContext;
24use nexo_core::agent::tool_registry::{ToolHandler, ToolRegistry};
25use nexo_llm::ToolDef;
26use nexo_poller::PollerRunner;
27use serde_json::{json, Value};
28
29pub struct PollersListTool {
30 runner: Arc<PollerRunner>,
31}
32impl PollersListTool {
33 pub fn new(runner: Arc<PollerRunner>) -> Self {
34 Self { runner }
35 }
36 pub fn tool_def() -> ToolDef {
37 ToolDef {
38 name: "pollers_list".to_string(),
39 description:
40 "List every configured poll job (gmail, rss, calendar, …) with its kind, agent owner, paused flag, last status and counters."
41 .into(),
42 parameters: json!({ "type": "object", "properties": {} }),
43 }
44 }
45}
46#[async_trait]
47impl ToolHandler for PollersListTool {
48 async fn call(&self, _ctx: &AgentContext, _args: Value) -> anyhow::Result<Value> {
49 let jobs = self.runner.list_jobs().await?;
50 Ok(serde_json::to_value(&jobs)?)
51 }
52}
53
54pub struct PollersShowTool {
55 runner: Arc<PollerRunner>,
56}
57impl PollersShowTool {
58 pub fn new(runner: Arc<PollerRunner>) -> Self {
59 Self { runner }
60 }
61 pub fn tool_def() -> ToolDef {
62 ToolDef {
63 name: "pollers_show".to_string(),
64 description: "Inspect a single poll job by id.".into(),
65 parameters: json!({
66 "type": "object",
67 "properties": {
68 "id": { "type": "string", "description": "Job id (matches pollers.yaml)" }
69 },
70 "required": ["id"]
71 }),
72 }
73 }
74}
75#[async_trait]
76impl ToolHandler for PollersShowTool {
77 async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
78 let id = args["id"]
79 .as_str()
80 .ok_or_else(|| anyhow::anyhow!("pollers_show requires `id`"))?;
81 let jobs = self.runner.list_jobs().await?;
82 let job = jobs
83 .into_iter()
84 .find(|j| j.id == id)
85 .ok_or_else(|| anyhow::anyhow!("unknown poll job '{id}'"))?;
86 Ok(serde_json::to_value(&job)?)
87 }
88}
89
90pub struct PollersRunTool {
91 runner: Arc<PollerRunner>,
92}
93impl PollersRunTool {
94 pub fn new(runner: Arc<PollerRunner>) -> Self {
95 Self { runner }
96 }
97 pub fn tool_def() -> ToolDef {
98 ToolDef {
99 name: "pollers_run".to_string(),
100 description:
101 "Trigger one tick of a poll job out-of-band (bypasses schedule + lease). Returns items_seen / items_dispatched / deliveries."
102 .into(),
103 parameters: json!({
104 "type": "object",
105 "properties": {
106 "id": { "type": "string", "description": "Job id" }
107 },
108 "required": ["id"]
109 }),
110 }
111 }
112}
113#[async_trait]
114impl ToolHandler for PollersRunTool {
115 async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
116 let id = args["id"]
117 .as_str()
118 .ok_or_else(|| anyhow::anyhow!("pollers_run requires `id`"))?;
119 let ack = self.runner.run_once(id).await?;
120 let metrics = ack.metrics.unwrap_or_default();
121 Ok(json!({
122 "ok": true,
123 "items_seen": metrics.items_seen,
124 "items_dispatched": metrics.items_dispatched,
125 }))
126 }
127}
128
129pub struct PollersPauseTool {
130 runner: Arc<PollerRunner>,
131}
132impl PollersPauseTool {
133 pub fn new(runner: Arc<PollerRunner>) -> Self {
134 Self { runner }
135 }
136 pub fn tool_def() -> ToolDef {
137 ToolDef {
138 name: "pollers_pause".to_string(),
139 description:
140 "Pause a poll job. The schedule stops firing until pollers_resume is called.".into(),
141 parameters: json!({
142 "type": "object",
143 "properties": {
144 "id": { "type": "string", "description": "Job id" }
145 },
146 "required": ["id"]
147 }),
148 }
149 }
150}
151#[async_trait]
152impl ToolHandler for PollersPauseTool {
153 async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
154 let id = args["id"]
155 .as_str()
156 .ok_or_else(|| anyhow::anyhow!("pollers_pause requires `id`"))?;
157 self.runner.set_paused(id, true).await?;
158 Ok(json!({"ok": true, "paused": true}))
159 }
160}
161
162pub struct PollersResumeTool {
163 runner: Arc<PollerRunner>,
164}
165impl PollersResumeTool {
166 pub fn new(runner: Arc<PollerRunner>) -> Self {
167 Self { runner }
168 }
169 pub fn tool_def() -> ToolDef {
170 ToolDef {
171 name: "pollers_resume".to_string(),
172 description: "Resume a paused poll job.".into(),
173 parameters: json!({
174 "type": "object",
175 "properties": {
176 "id": { "type": "string", "description": "Job id" }
177 },
178 "required": ["id"]
179 }),
180 }
181 }
182}
183#[async_trait]
184impl ToolHandler for PollersResumeTool {
185 async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
186 let id = args["id"]
187 .as_str()
188 .ok_or_else(|| anyhow::anyhow!("pollers_resume requires `id`"))?;
189 self.runner.set_paused(id, false).await?;
190 Ok(json!({"ok": true, "paused": false}))
191 }
192}
193
194pub struct PollersResetTool {
195 runner: Arc<PollerRunner>,
196}
197impl PollersResetTool {
198 pub fn new(runner: Arc<PollerRunner>) -> Self {
199 Self { runner }
200 }
201 pub fn tool_def() -> ToolDef {
202 ToolDef {
203 name: "pollers_reset".to_string(),
204 description:
205 "Reset the cursor and error state of a poll job. Destructive: the next tick re-baselines (gmail will scan from `newer_than`, calendar will fetch a fresh syncToken). Confirm intent before calling."
206 .into(),
207 parameters: json!({
208 "type": "object",
209 "properties": {
210 "id": { "type": "string", "description": "Job id" }
211 },
212 "required": ["id"]
213 }),
214 }
215 }
216}
217#[async_trait]
218impl ToolHandler for PollersResetTool {
219 async fn call(&self, _ctx: &AgentContext, args: Value) -> anyhow::Result<Value> {
220 let id = args["id"]
221 .as_str()
222 .ok_or_else(|| anyhow::anyhow!("pollers_reset requires `id`"))?;
223 self.runner.reset_cursor(id).await?;
224 Ok(json!({"ok": true, "reset": true}))
225 }
226}
227
228struct CustomToolAdapter {
232 runner: Arc<PollerRunner>,
233 inner: Arc<dyn nexo_poller::CustomToolHandler>,
234}
235#[async_trait]
236impl ToolHandler for CustomToolAdapter {
237 async fn call(&self, ctx: &AgentContext, mut args: Value) -> anyhow::Result<Value> {
238 if let Value::Object(map) = &mut args {
244 map.insert("_agent_id".to_string(), Value::String(ctx.agent_id.clone()));
245 }
246 self.inner.call(Arc::clone(&self.runner), args).await
247 }
248}
249
250pub fn register_all(registry: &ToolRegistry, runner: Arc<PollerRunner>) {
254 registry.register(
255 PollersListTool::tool_def(),
256 PollersListTool::new(runner.clone()),
257 );
258 registry.register(
259 PollersShowTool::tool_def(),
260 PollersShowTool::new(runner.clone()),
261 );
262 registry.register(
263 PollersRunTool::tool_def(),
264 PollersRunTool::new(runner.clone()),
265 );
266 registry.register(
267 PollersPauseTool::tool_def(),
268 PollersPauseTool::new(runner.clone()),
269 );
270 registry.register(
271 PollersResumeTool::tool_def(),
272 PollersResumeTool::new(runner.clone()),
273 );
274 registry.register(
275 PollersResetTool::tool_def(),
276 PollersResetTool::new(runner.clone()),
277 );
278
279 for spec in runner.collect_custom_tools() {
282 registry.register(
283 spec.def,
284 CustomToolAdapter {
285 runner: Arc::clone(&runner),
286 inner: spec.handler,
287 },
288 );
289 }
290}