1use std::sync::Arc;
10
11use serde_json::Value;
12
13use crate::plugin::{
14 ModeNativeToolsPlugin, PluginError, PluginFactory, PluginRegistrar, PluginSessionContext,
15 SessionPlugin,
16};
17use crate::tool_dispatch::ToolDispatchContext;
18use crate::{
19 BackgroundTaskKind, BackgroundTaskState, MAX_MONITOR_TIMEOUT_MS, MonitorRunState, MonitorSpec,
20 ProgressSender, ToolContract, ToolDefinition, ToolExecutionMode, ToolManifest, ToolResult,
21};
22
23#[derive(Default)]
25pub struct BuiltinTaskControlsPluginFactory;
26
27impl BuiltinTaskControlsPluginFactory {
28 pub fn new() -> Self {
29 Self
30 }
31}
32
33impl PluginFactory for BuiltinTaskControlsPluginFactory {
34 fn id(&self) -> &'static str {
35 "task_controls"
36 }
37
38 fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
39 Ok(Arc::new(TaskControlsPlugin {
40 enabled: ctx.execution_mode != crate::ExecutionMode::new("rlm"),
41 }))
42 }
43}
44
45struct TaskControlsPlugin {
46 enabled: bool,
47}
48
49impl SessionPlugin for TaskControlsPlugin {
50 fn id(&self) -> &'static str {
51 "task_controls"
52 }
53
54 fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
55 if self.enabled {
56 reg.mode().native_tools(Arc::new(TaskControlsNativeTools))?;
57 }
58 Ok(())
59 }
60}
61
62struct TaskControlsNativeTools;
63
64#[async_trait::async_trait]
65impl ModeNativeToolsPlugin for TaskControlsNativeTools {
66 fn tool_manifests(&self) -> Vec<ToolManifest> {
67 task_control_tool_definitions()
68 .into_iter()
69 .map(|tool| tool.manifest())
70 .collect()
71 }
72
73 fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>> {
74 task_control_tool_definitions()
75 .into_iter()
76 .find(|tool| tool.name == name)
77 .map(|tool| Arc::new(tool.contract()))
78 }
79
80 async fn execute(
81 &self,
82 context: &ToolDispatchContext,
83 name: &str,
84 args: &Value,
85 _progress: Option<&ProgressSender>,
86 ) -> Option<ToolResult> {
87 match name {
88 "tasks_list" => Some(execute_tasks_list_tool_call(context).await),
89 "tasks_stop" => Some(execute_tasks_stop_tool_call(context, args).await),
90 _ => None,
91 }
92 }
93}
94
95#[derive(Default)]
97pub struct BuiltinMonitorToolPluginFactory;
98
99impl BuiltinMonitorToolPluginFactory {
100 pub fn new() -> Self {
101 Self
102 }
103}
104
105impl PluginFactory for BuiltinMonitorToolPluginFactory {
106 fn id(&self) -> &'static str {
107 "monitor_tool"
108 }
109
110 fn build(&self, _ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
111 Ok(Arc::new(MonitorToolPlugin))
112 }
113}
114
115struct MonitorToolPlugin;
116
117impl SessionPlugin for MonitorToolPlugin {
118 fn id(&self) -> &'static str {
119 "monitor_tool"
120 }
121
122 fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
123 reg.mode().native_tools(Arc::new(MonitorNativeTool))
124 }
125}
126
127struct MonitorNativeTool;
128
129#[async_trait::async_trait]
130impl ModeNativeToolsPlugin for MonitorNativeTool {
131 fn tool_manifests(&self) -> Vec<ToolManifest> {
132 vec![monitor_tool_definition().manifest()]
133 }
134
135 fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>> {
136 (name == "monitor").then(|| Arc::new(monitor_tool_definition().contract()))
137 }
138
139 async fn execute(
140 &self,
141 context: &ToolDispatchContext,
142 name: &str,
143 args: &Value,
144 _progress: Option<&ProgressSender>,
145 ) -> Option<ToolResult> {
146 match name {
147 "monitor" => {
148 let spec = match MonitorToolSpec::from_args(args) {
149 Ok(spec) => spec,
150 Err(result) => return Some(result),
151 };
152 Some(execute_monitor_tool_call(context, spec).await)
153 }
154 _ => None,
155 }
156 }
157}
158
159pub fn monitor_tool_definition() -> ToolDefinition {
161 ToolDefinition::raw(
162 "monitor",
163 "Run a background script that turns each stdout line into a new turn wake-up. Use for streaming watches (`tell me every time X happens`); for one-shot `wait until X`, just run the command synchronously instead. In RLM mode this returns a lashlang async handle; use `list_async_handles` to rediscover live monitors and `cancel handle` to stop one.\n\nEvents arrive automatically as user-like input — do not call another tool to collect them. Return your turn after starting the monitor; the runtime wakes a new turn on the first line.\n\n**Pipe guards**\n- Always use `grep --line-buffered` in pipes (otherwise pipe buffering delays events by minutes).\n- Merge stderr into stdout (`cmd 2>&1 | grep ...`) — stderr alone is not observed.\n- In poll loops wrap transient failures (`curl ... || true`) and pick intervals ≥30s for remote APIs, 0.5–1s for local checks.\n\n**Coverage — silence is not success.** Your filter must match every terminal state, not just the happy path. A monitor that greps only for the success marker stays silent through a crashloop, a hang, or an unexpected exit — and silence looks identical to `still running`. If you can't enumerate the failure signatures, broaden the alternation rather than narrow it.\n\nWrong: `tail -f run.log | grep --line-buffered \"elapsed_steps=\"`\nRight: `tail -f run.log | grep -E --line-buffered \"elapsed_steps=|Traceback|Error|FAILED|Killed|OOM\"`\n\nSet `persistent: true` for session-length watches. Timeout → killed; exit ends the watch (exit code is reported).",
164 serde_json::json!({
165 "type": "object",
166 "properties": {
167 "command": {
168 "type": "string",
169 "description": "Shell command or script. Each stdout line is an event; exit ends the watch. Filter with `grep --line-buffered` (or equivalent) so only the lines you'd act on become events — including failure signatures, not just success."
170 },
171 "description": {
172 "type": "string",
173 "description": "Short human-readable description of what you are monitoring (shown in every notification). Be specific — \"errors in deploy.log\" beats \"watching logs\"."
174 },
175 "persistent": {
176 "type": "boolean",
177 "default": false,
178 "description": "Run for the lifetime of the session (no timeout). Use for session-length watches like PR monitoring or log tails."
179 },
180 "timeout_ms": {
181 "type": "number",
182 "minimum": 1,
183 "maximum": MAX_MONITOR_TIMEOUT_MS,
184 "default": 300000,
185 "description": "Kill the monitor after this deadline. Default 300000ms, max 3600000ms. Ignored when persistent is true."
186 }
187 },
188 "required": ["command", "description"],
189 "additionalProperties": false
190 }),
191 serde_json::json!({ "type": "object", "additionalProperties": true }),
192 )
193 .with_examples(vec![
194 r#"monitor(command="tail -f /var/log/app.log | grep -E --line-buffered 'ERROR|Traceback|FAILED'", description="errors in app.log")"#.into(),
195 r#"monitor(command="while true; do curl -sf http://localhost:3000/health && echo ready && break; sleep 2; done", description="local server ready", timeout_ms=300000)"#.into(),
196 ])
197 .with_execution_mode(ToolExecutionMode::Parallel)
198}
199
200pub fn tasks_list_tool_definition() -> ToolDefinition {
201 ToolDefinition::raw(
202 "tasks_list",
203 "List every background task registered for this session — monitors and subagents — with their `task_id`, kind, label, and state. `state` is one of `running`, `idle`, `completed`, `failed`, or `cancelled`. Use this to see what's still running before deciding whether to keep waiting, poll again, or stop something.",
204 ToolDefinition::default_input_schema(),
205 serde_json::json!({ "type": "object", "additionalProperties": true }),
206 )
207 .with_examples(vec!["tasks_list()".into()])
208 .with_execution_mode(ToolExecutionMode::Parallel)
209}
210
211fn task_control_tool_definitions() -> Vec<ToolDefinition> {
212 vec![tasks_list_tool_definition(), tasks_stop_tool_definition()]
213}
214
215pub fn tasks_stop_tool_definition() -> ToolDefinition {
216 ToolDefinition::raw(
217 "tasks_stop",
218 "Cancel a background task by `task_id`. Monitors and subagents return this id when started; `tasks_list` can also rediscover it. For monitors this terminates the process tree; for subagents it closes the subtree (running turns are cancelled, idle sessions closed).",
219 serde_json::json!({
220 "type": "object",
221 "properties": {
222 "task_id": {
223 "type": "string",
224 "description": "Task id returned by `monitor`, `spawn_agent`, or `tasks_list`."
225 }
226 },
227 "required": ["task_id"],
228 "additionalProperties": false
229 }),
230 serde_json::json!({ "type": "object", "additionalProperties": true }),
231 )
232 .with_examples(vec![
233 r#"tasks_stop(task_id="monitor:app-errors")"#.into(),
234 r#"tasks_stop(task_id="subagent:/root/inspect_auth")"#.into(),
235 ])
236 .with_execution_mode(ToolExecutionMode::Parallel)
237}
238
239pub struct MonitorToolSpec {
241 pub command: String,
242 pub description: String,
243 pub persistent: bool,
244 pub timeout_ms: u64,
245}
246
247impl MonitorToolSpec {
248 #[allow(clippy::result_large_err)]
249 pub fn from_args(args: &Value) -> Result<Self, ToolResult> {
250 let command = args
251 .get("command")
252 .and_then(|value| value.as_str())
253 .map(str::trim)
254 .filter(|value| !value.is_empty())
255 .ok_or_else(|| ToolResult::err_fmt("monitor requires `command`"))?;
256 let description = args
257 .get("description")
258 .and_then(|value| value.as_str())
259 .map(str::trim)
260 .filter(|value| !value.is_empty())
261 .ok_or_else(|| ToolResult::err_fmt("monitor requires `description`"))?;
262 let persistent = args
263 .get("persistent")
264 .and_then(|value| value.as_bool())
265 .unwrap_or(false);
266 let timeout_ms = args
267 .get("timeout_ms")
268 .and_then(|value| value.as_u64())
269 .unwrap_or(300_000);
270 if timeout_ms > MAX_MONITOR_TIMEOUT_MS {
271 return Err(ToolResult::err_fmt(format_args!(
272 "monitor timeout_ms must be <= {MAX_MONITOR_TIMEOUT_MS}"
273 )));
274 }
275 Ok(Self {
276 command: command.to_string(),
277 description: description.to_string(),
278 persistent,
279 timeout_ms,
280 })
281 }
282}
283
284pub async fn execute_monitor_tool_call(
285 context: &ToolDispatchContext,
286 spec: MonitorToolSpec,
287) -> ToolResult {
288 let id = uuid::Uuid::new_v4().simple().to_string();
289 let monitor_spec = MonitorSpec {
290 id: id.clone(),
291 command: spec.command,
292 cwd: None,
293 env: Default::default(),
294 persistent: spec.persistent,
295 timeout_ms: spec.timeout_ms,
296 arm_on: Default::default(),
297 wake_policy: Default::default(),
298 restart_on_restore: spec.persistent,
299 };
300 match context
301 .host
302 .start_monitor(&context.session_id, monitor_spec)
303 .await
304 {
305 Ok(snapshot) => {
306 let started = snapshot
307 .monitors
308 .iter()
309 .find(|status| status.spec.id == id)
310 .cloned();
311 match started {
312 Some(status) => ToolResult::ok(serde_json::json!({
313 "task_id": format!("monitor:{}", status.spec.id),
314 "monitor_id": status.spec.id,
315 "description": status.spec.command,
316 "command": status.spec.command,
317 "persistent": status.spec.persistent,
318 "timeout_ms": status.spec.timeout_ms,
319 "state": match status.state {
320 MonitorRunState::Idle => "idle",
321 MonitorRunState::Running => "running",
322 MonitorRunState::Stopped => "stopped",
323 MonitorRunState::Exited => "exited",
324 MonitorRunState::Failed => "failed",
325 },
326 })),
327 None => ToolResult::err_fmt("monitor started but status was unavailable"),
328 }
329 }
330 Err(err) => ToolResult::err_fmt(err.to_string()),
331 }
332}
333
334pub async fn execute_tasks_list_tool_call(context: &ToolDispatchContext) -> ToolResult {
335 match context
336 .host
337 .list_background_tasks(&context.session_id)
338 .await
339 {
340 Ok(tasks) => {
341 let entries: Vec<Value> = tasks
342 .into_iter()
343 .map(|task| {
344 let created_at_iso = chrono::DateTime::<chrono::Utc>::from(task.created_at)
345 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
346 serde_json::json!({
347 "task_id": task.id,
348 "kind": kind_label(task.kind),
349 "producer": task.producer,
350 "state": state_label(task.state),
351 "created_at": created_at_iso,
352 })
353 })
354 .collect();
355 ToolResult::ok(serde_json::json!({ "tasks": entries }))
356 }
357 Err(err) => ToolResult::err_fmt(err.to_string()),
358 }
359}
360
361pub async fn execute_tasks_stop_tool_call(
362 context: &ToolDispatchContext,
363 args: &Value,
364) -> ToolResult {
365 let Some(id) = args
366 .get("task_id")
367 .and_then(|value| value.as_str())
368 .map(str::trim)
369 .filter(|value| !value.is_empty())
370 else {
371 return ToolResult::err_fmt("tasks_stop requires `task_id`");
372 };
373 match context
374 .host
375 .cancel_background_task(&context.session_id, id)
376 .await
377 {
378 Ok(status) => ToolResult::ok(serde_json::json!({
379 "task_id": status.id,
380 "kind": kind_label(status.kind),
381 "state": state_label(status.state),
382 })),
383 Err(err) => ToolResult::err_fmt(err.to_string()),
384 }
385}
386
387fn state_label(state: BackgroundTaskState) -> &'static str {
388 match state {
389 BackgroundTaskState::Pending => "pending",
390 BackgroundTaskState::Running => "running",
391 BackgroundTaskState::Waiting => "idle",
392 BackgroundTaskState::Completed => "completed",
393 BackgroundTaskState::Failed => "failed",
394 BackgroundTaskState::CancelRequested => "cancel_requested",
395 BackgroundTaskState::Cancelled => "cancelled",
396 }
397}
398
399fn kind_label(kind: BackgroundTaskKind) -> &'static str {
400 kind.as_str()
401}