Skip to main content

lash_core/
runtime_controls.rs

1//! Mode-agnostic runtime-control tools (`monitor`, `tasks_list`,
2//! `tasks_stop`).
3//!
4//! Dedicated plugins register these tools into the native-tools surface,
5//! so mode crates do not own or duplicate runtime control behavior. RLM
6//! hides `tasks_list` / `tasks_stop` because it exposes the same
7//! background control through lashlang async handles.
8
9use std::sync::Arc;
10
11use serde_json::Value;
12
13use crate::plugin::{
14    ModeNativeToolsPlugin, PluginError, PluginFactory, PluginRegistrar, PluginSessionContext,
15    SessionPlugin,
16};
17use crate::tool_dispatch::ToolDispatchContext;
18use crate::{
19    BackgroundTaskKind, BackgroundTaskState, MAX_MONITOR_TIMEOUT_MS, MonitorRunState, MonitorSpec,
20    ProgressSender, ToolContract, ToolDefinition, ToolExecutionMode, ToolManifest, ToolResult,
21};
22
23/// Plugin factory for mode-agnostic task-control tools.
24#[derive(Default)]
25pub struct BuiltinTaskControlsPluginFactory;
26
27impl BuiltinTaskControlsPluginFactory {
28    pub fn new() -> Self {
29        Self
30    }
31}
32
33impl PluginFactory for BuiltinTaskControlsPluginFactory {
34    fn id(&self) -> &'static str {
35        "task_controls"
36    }
37
38    fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
39        Ok(Arc::new(TaskControlsPlugin {
40            enabled: ctx.execution_mode != crate::ExecutionMode::new("rlm"),
41        }))
42    }
43}
44
45struct TaskControlsPlugin {
46    enabled: bool,
47}
48
49impl SessionPlugin for TaskControlsPlugin {
50    fn id(&self) -> &'static str {
51        "task_controls"
52    }
53
54    fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
55        if self.enabled {
56            reg.mode().native_tools(Arc::new(TaskControlsNativeTools))?;
57        }
58        Ok(())
59    }
60}
61
62struct TaskControlsNativeTools;
63
64#[async_trait::async_trait]
65impl ModeNativeToolsPlugin for TaskControlsNativeTools {
66    fn tool_manifests(&self) -> Vec<ToolManifest> {
67        task_control_tool_definitions()
68            .into_iter()
69            .map(|tool| tool.manifest())
70            .collect()
71    }
72
73    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>> {
74        task_control_tool_definitions()
75            .into_iter()
76            .find(|tool| tool.name == name)
77            .map(|tool| Arc::new(tool.contract()))
78    }
79
80    async fn execute(
81        &self,
82        context: &ToolDispatchContext,
83        name: &str,
84        args: &Value,
85        _progress: Option<&ProgressSender>,
86    ) -> Option<ToolResult> {
87        match name {
88            "tasks_list" => Some(execute_tasks_list_tool_call(context).await),
89            "tasks_stop" => Some(execute_tasks_stop_tool_call(context, args).await),
90            _ => None,
91        }
92    }
93}
94
95/// Plugin factory for the shell-backed `monitor` tool.
96#[derive(Default)]
97pub struct BuiltinMonitorToolPluginFactory;
98
99impl BuiltinMonitorToolPluginFactory {
100    pub fn new() -> Self {
101        Self
102    }
103}
104
105impl PluginFactory for BuiltinMonitorToolPluginFactory {
106    fn id(&self) -> &'static str {
107        "monitor_tool"
108    }
109
110    fn build(&self, _ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
111        Ok(Arc::new(MonitorToolPlugin))
112    }
113}
114
115struct MonitorToolPlugin;
116
117impl SessionPlugin for MonitorToolPlugin {
118    fn id(&self) -> &'static str {
119        "monitor_tool"
120    }
121
122    fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
123        reg.mode().native_tools(Arc::new(MonitorNativeTool))
124    }
125}
126
127struct MonitorNativeTool;
128
129#[async_trait::async_trait]
130impl ModeNativeToolsPlugin for MonitorNativeTool {
131    fn tool_manifests(&self) -> Vec<ToolManifest> {
132        vec![monitor_tool_definition().manifest()]
133    }
134
135    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>> {
136        (name == "monitor").then(|| Arc::new(monitor_tool_definition().contract()))
137    }
138
139    async fn execute(
140        &self,
141        context: &ToolDispatchContext,
142        name: &str,
143        args: &Value,
144        _progress: Option<&ProgressSender>,
145    ) -> Option<ToolResult> {
146        match name {
147            "monitor" => {
148                let spec = match MonitorToolSpec::from_args(args) {
149                    Ok(spec) => spec,
150                    Err(result) => return Some(result),
151                };
152                Some(execute_monitor_tool_call(context, spec).await)
153            }
154            _ => None,
155        }
156    }
157}
158
159/// Build the `monitor` tool definition.
160pub fn monitor_tool_definition() -> ToolDefinition {
161    ToolDefinition::raw(
162        "monitor",
163        "Run a background script that turns each stdout line into a new turn wake-up. Use for streaming watches (`tell me every time X happens`); for one-shot `wait until X`, just run the command synchronously instead. In RLM mode this returns a lashlang async handle; use `list_async_handles` to rediscover live monitors and `cancel handle` to stop one.\n\nEvents arrive automatically as user-like input — do not call another tool to collect them. Return your turn after starting the monitor; the runtime wakes a new turn on the first line.\n\n**Pipe guards**\n- Always use `grep --line-buffered` in pipes (otherwise pipe buffering delays events by minutes).\n- Merge stderr into stdout (`cmd 2>&1 | grep ...`) — stderr alone is not observed.\n- In poll loops wrap transient failures (`curl ... || true`) and pick intervals ≥30s for remote APIs, 0.5–1s for local checks.\n\n**Coverage — silence is not success.** Your filter must match every terminal state, not just the happy path. A monitor that greps only for the success marker stays silent through a crashloop, a hang, or an unexpected exit — and silence looks identical to `still running`. If you can't enumerate the failure signatures, broaden the alternation rather than narrow it.\n\nWrong: `tail -f run.log | grep --line-buffered \"elapsed_steps=\"`\nRight: `tail -f run.log | grep -E --line-buffered \"elapsed_steps=|Traceback|Error|FAILED|Killed|OOM\"`\n\nSet `persistent: true` for session-length watches. Timeout → killed; exit ends the watch (exit code is reported).",
164        serde_json::json!({
165            "type": "object",
166            "properties": {
167                "command": {
168                    "type": "string",
169                    "description": "Shell command or script. Each stdout line is an event; exit ends the watch. Filter with `grep --line-buffered` (or equivalent) so only the lines you'd act on become events — including failure signatures, not just success."
170                },
171                "description": {
172                    "type": "string",
173                    "description": "Short human-readable description of what you are monitoring (shown in every notification). Be specific — \"errors in deploy.log\" beats \"watching logs\"."
174                },
175                "persistent": {
176                    "type": "boolean",
177                    "default": false,
178                    "description": "Run for the lifetime of the session (no timeout). Use for session-length watches like PR monitoring or log tails."
179                },
180                "timeout_ms": {
181                    "type": "number",
182                    "minimum": 1,
183                    "maximum": MAX_MONITOR_TIMEOUT_MS,
184                    "default": 300000,
185                    "description": "Kill the monitor after this deadline. Default 300000ms, max 3600000ms. Ignored when persistent is true."
186                }
187            },
188            "required": ["command", "description"],
189            "additionalProperties": false
190        }),
191        serde_json::json!({ "type": "object", "additionalProperties": true }),
192    )
193    .with_examples(vec![
194        r#"monitor(command="tail -f /var/log/app.log | grep -E --line-buffered 'ERROR|Traceback|FAILED'", description="errors in app.log")"#.into(),
195        r#"monitor(command="while true; do curl -sf http://localhost:3000/health && echo ready && break; sleep 2; done", description="local server ready", timeout_ms=300000)"#.into(),
196    ])
197    .with_execution_mode(ToolExecutionMode::Parallel)
198}
199
200pub fn tasks_list_tool_definition() -> ToolDefinition {
201    ToolDefinition::raw(
202        "tasks_list",
203        "List every background task registered for this session — monitors and subagents — with their `task_id`, kind, label, and state. `state` is one of `running`, `idle`, `completed`, `failed`, or `cancelled`. Use this to see what's still running before deciding whether to keep waiting, poll again, or stop something.",
204        ToolDefinition::default_input_schema(),
205        serde_json::json!({ "type": "object", "additionalProperties": true }),
206    )
207    .with_examples(vec!["tasks_list()".into()])
208    .with_execution_mode(ToolExecutionMode::Parallel)
209}
210
211fn task_control_tool_definitions() -> Vec<ToolDefinition> {
212    vec![tasks_list_tool_definition(), tasks_stop_tool_definition()]
213}
214
215pub fn tasks_stop_tool_definition() -> ToolDefinition {
216    ToolDefinition::raw(
217        "tasks_stop",
218        "Cancel a background task by `task_id`. Monitors and subagents return this id when started; `tasks_list` can also rediscover it. For monitors this terminates the process tree; for subagents it closes the subtree (running turns are cancelled, idle sessions closed).",
219        serde_json::json!({
220            "type": "object",
221            "properties": {
222                "task_id": {
223                    "type": "string",
224                    "description": "Task id returned by `monitor`, `spawn_agent`, or `tasks_list`."
225                }
226            },
227            "required": ["task_id"],
228            "additionalProperties": false
229        }),
230        serde_json::json!({ "type": "object", "additionalProperties": true }),
231    )
232    .with_examples(vec![
233        r#"tasks_stop(task_id="monitor:app-errors")"#.into(),
234        r#"tasks_stop(task_id="subagent:/root/inspect_auth")"#.into(),
235    ])
236    .with_execution_mode(ToolExecutionMode::Parallel)
237}
238
239/// Parsed `monitor` arguments ready to hand to the session manager.
240pub struct MonitorToolSpec {
241    pub command: String,
242    pub description: String,
243    pub persistent: bool,
244    pub timeout_ms: u64,
245}
246
247impl MonitorToolSpec {
248    #[allow(clippy::result_large_err)]
249    pub fn from_args(args: &Value) -> Result<Self, ToolResult> {
250        let command = args
251            .get("command")
252            .and_then(|value| value.as_str())
253            .map(str::trim)
254            .filter(|value| !value.is_empty())
255            .ok_or_else(|| ToolResult::err_fmt("monitor requires `command`"))?;
256        let description = args
257            .get("description")
258            .and_then(|value| value.as_str())
259            .map(str::trim)
260            .filter(|value| !value.is_empty())
261            .ok_or_else(|| ToolResult::err_fmt("monitor requires `description`"))?;
262        let persistent = args
263            .get("persistent")
264            .and_then(|value| value.as_bool())
265            .unwrap_or(false);
266        let timeout_ms = args
267            .get("timeout_ms")
268            .and_then(|value| value.as_u64())
269            .unwrap_or(300_000);
270        if timeout_ms > MAX_MONITOR_TIMEOUT_MS {
271            return Err(ToolResult::err_fmt(format_args!(
272                "monitor timeout_ms must be <= {MAX_MONITOR_TIMEOUT_MS}"
273            )));
274        }
275        Ok(Self {
276            command: command.to_string(),
277            description: description.to_string(),
278            persistent,
279            timeout_ms,
280        })
281    }
282}
283
284pub async fn execute_monitor_tool_call(
285    context: &ToolDispatchContext,
286    spec: MonitorToolSpec,
287) -> ToolResult {
288    let id = uuid::Uuid::new_v4().simple().to_string();
289    let monitor_spec = MonitorSpec {
290        id: id.clone(),
291        command: spec.command,
292        cwd: None,
293        env: Default::default(),
294        persistent: spec.persistent,
295        timeout_ms: spec.timeout_ms,
296        arm_on: Default::default(),
297        wake_policy: Default::default(),
298        restart_on_restore: spec.persistent,
299    };
300    match context
301        .host
302        .start_monitor(&context.session_id, monitor_spec)
303        .await
304    {
305        Ok(snapshot) => {
306            let started = snapshot
307                .monitors
308                .iter()
309                .find(|status| status.spec.id == id)
310                .cloned();
311            match started {
312                Some(status) => ToolResult::ok(serde_json::json!({
313                    "task_id": format!("monitor:{}", status.spec.id),
314                    "monitor_id": status.spec.id,
315                    "description": status.spec.command,
316                    "command": status.spec.command,
317                    "persistent": status.spec.persistent,
318                    "timeout_ms": status.spec.timeout_ms,
319                    "state": match status.state {
320                        MonitorRunState::Idle => "idle",
321                        MonitorRunState::Running => "running",
322                        MonitorRunState::Stopped => "stopped",
323                        MonitorRunState::Exited => "exited",
324                        MonitorRunState::Failed => "failed",
325                    },
326                })),
327                None => ToolResult::err_fmt("monitor started but status was unavailable"),
328            }
329        }
330        Err(err) => ToolResult::err_fmt(err.to_string()),
331    }
332}
333
334pub async fn execute_tasks_list_tool_call(context: &ToolDispatchContext) -> ToolResult {
335    match context
336        .host
337        .list_background_tasks(&context.session_id)
338        .await
339    {
340        Ok(tasks) => {
341            let entries: Vec<Value> = tasks
342                .into_iter()
343                .map(|task| {
344                    let created_at_iso = chrono::DateTime::<chrono::Utc>::from(task.created_at)
345                        .to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
346                    serde_json::json!({
347                        "task_id": task.id,
348                        "kind": kind_label(task.kind),
349                        "producer": task.producer,
350                        "state": state_label(task.state),
351                        "created_at": created_at_iso,
352                    })
353                })
354                .collect();
355            ToolResult::ok(serde_json::json!({ "tasks": entries }))
356        }
357        Err(err) => ToolResult::err_fmt(err.to_string()),
358    }
359}
360
361pub async fn execute_tasks_stop_tool_call(
362    context: &ToolDispatchContext,
363    args: &Value,
364) -> ToolResult {
365    let Some(id) = args
366        .get("task_id")
367        .and_then(|value| value.as_str())
368        .map(str::trim)
369        .filter(|value| !value.is_empty())
370    else {
371        return ToolResult::err_fmt("tasks_stop requires `task_id`");
372    };
373    match context
374        .host
375        .cancel_background_task(&context.session_id, id)
376        .await
377    {
378        Ok(status) => ToolResult::ok(serde_json::json!({
379            "task_id": status.id,
380            "kind": kind_label(status.kind),
381            "state": state_label(status.state),
382        })),
383        Err(err) => ToolResult::err_fmt(err.to_string()),
384    }
385}
386
387fn state_label(state: BackgroundTaskState) -> &'static str {
388    match state {
389        BackgroundTaskState::Pending => "pending",
390        BackgroundTaskState::Running => "running",
391        BackgroundTaskState::Waiting => "idle",
392        BackgroundTaskState::Completed => "completed",
393        BackgroundTaskState::Failed => "failed",
394        BackgroundTaskState::CancelRequested => "cancel_requested",
395        BackgroundTaskState::Cancelled => "cancelled",
396    }
397}
398
399fn kind_label(kind: BackgroundTaskKind) -> &'static str {
400    kind.as_str()
401}