lash-core 0.1.0-alpha.1

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
//! Mode-agnostic runtime-control tools (`monitor`, `tasks_list`,
//! `tasks_stop`).
//!
//! Dedicated plugins register these tools into the native-tools surface,
//! so mode crates do not own or duplicate runtime control behavior. RLM
//! hides `tasks_list` / `tasks_stop` because it exposes the same
//! background control through lashlang async handles.

use std::sync::Arc;

use serde_json::Value;

use crate::plugin::{
    ModeNativeToolsPlugin, PluginError, PluginFactory, PluginRegistrar, PluginSessionContext,
    SessionPlugin,
};
use crate::tool_dispatch::ToolDispatchContext;
use crate::{
    BackgroundTaskKind, BackgroundTaskState, MAX_MONITOR_TIMEOUT_MS, MonitorRunState, MonitorSpec,
    ProgressSender, ToolContract, ToolDefinition, ToolExecutionMode, ToolManifest, ToolResult,
};

/// Plugin factory for mode-agnostic task-control tools.
#[derive(Default)]
pub struct BuiltinTaskControlsPluginFactory;

impl BuiltinTaskControlsPluginFactory {
    pub fn new() -> Self {
        Self
    }
}

impl PluginFactory for BuiltinTaskControlsPluginFactory {
    fn id(&self) -> &'static str {
        "task_controls"
    }

    fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
        Ok(Arc::new(TaskControlsPlugin {
            enabled: ctx.execution_mode != crate::ExecutionMode::new("rlm"),
        }))
    }
}

struct TaskControlsPlugin {
    enabled: bool,
}

impl SessionPlugin for TaskControlsPlugin {
    fn id(&self) -> &'static str {
        "task_controls"
    }

    fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
        if self.enabled {
            reg.mode().native_tools(Arc::new(TaskControlsNativeTools))?;
        }
        Ok(())
    }
}

struct TaskControlsNativeTools;

#[async_trait::async_trait]
impl ModeNativeToolsPlugin for TaskControlsNativeTools {
    fn tool_manifests(&self) -> Vec<ToolManifest> {
        task_control_tool_definitions()
            .into_iter()
            .map(|tool| tool.manifest())
            .collect()
    }

    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>> {
        task_control_tool_definitions()
            .into_iter()
            .find(|tool| tool.name == name)
            .map(|tool| Arc::new(tool.contract()))
    }

    async fn execute(
        &self,
        context: &ToolDispatchContext,
        name: &str,
        args: &Value,
        _progress: Option<&ProgressSender>,
    ) -> Option<ToolResult> {
        match name {
            "tasks_list" => Some(execute_tasks_list_tool_call(context).await),
            "tasks_stop" => Some(execute_tasks_stop_tool_call(context, args).await),
            _ => None,
        }
    }
}

/// Plugin factory for the shell-backed `monitor` tool.
#[derive(Default)]
pub struct BuiltinMonitorToolPluginFactory;

impl BuiltinMonitorToolPluginFactory {
    pub fn new() -> Self {
        Self
    }
}

impl PluginFactory for BuiltinMonitorToolPluginFactory {
    fn id(&self) -> &'static str {
        "monitor_tool"
    }

    fn build(&self, _ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
        Ok(Arc::new(MonitorToolPlugin))
    }
}

struct MonitorToolPlugin;

impl SessionPlugin for MonitorToolPlugin {
    fn id(&self) -> &'static str {
        "monitor_tool"
    }

    fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
        reg.mode().native_tools(Arc::new(MonitorNativeTool))
    }
}

struct MonitorNativeTool;

#[async_trait::async_trait]
impl ModeNativeToolsPlugin for MonitorNativeTool {
    fn tool_manifests(&self) -> Vec<ToolManifest> {
        vec![monitor_tool_definition().manifest()]
    }

    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>> {
        (name == "monitor").then(|| Arc::new(monitor_tool_definition().contract()))
    }

    async fn execute(
        &self,
        context: &ToolDispatchContext,
        name: &str,
        args: &Value,
        _progress: Option<&ProgressSender>,
    ) -> Option<ToolResult> {
        match name {
            "monitor" => {
                let spec = match MonitorToolSpec::from_args(args) {
                    Ok(spec) => spec,
                    Err(result) => return Some(result),
                };
                Some(execute_monitor_tool_call(context, spec).await)
            }
            _ => None,
        }
    }
}

/// Build the `monitor` tool definition.
pub fn monitor_tool_definition() -> ToolDefinition {
    ToolDefinition::raw(
        "monitor",
        "Run a background script that turns each stdout line into a new turn wake-up. Use for streaming watches (`tell me every time X happens`); for one-shot `wait until X`, just run the command synchronously instead. In RLM mode this returns a lashlang async handle; use `list_async_handles` to rediscover live monitors and `cancel handle` to stop one.\n\nEvents arrive automatically as user-like input — do not call another tool to collect them. Return your turn after starting the monitor; the runtime wakes a new turn on the first line.\n\n**Pipe guards**\n- Always use `grep --line-buffered` in pipes (otherwise pipe buffering delays events by minutes).\n- Merge stderr into stdout (`cmd 2>&1 | grep ...`) — stderr alone is not observed.\n- In poll loops wrap transient failures (`curl ... || true`) and pick intervals ≥30s for remote APIs, 0.5–1s for local checks.\n\n**Coverage — silence is not success.** Your filter must match every terminal state, not just the happy path. A monitor that greps only for the success marker stays silent through a crashloop, a hang, or an unexpected exit — and silence looks identical to `still running`. If you can't enumerate the failure signatures, broaden the alternation rather than narrow it.\n\nWrong: `tail -f run.log | grep --line-buffered \"elapsed_steps=\"`\nRight: `tail -f run.log | grep -E --line-buffered \"elapsed_steps=|Traceback|Error|FAILED|Killed|OOM\"`\n\nSet `persistent: true` for session-length watches. Timeout → killed; exit ends the watch (exit code is reported).",
        serde_json::json!({
            "type": "object",
            "properties": {
                "command": {
                    "type": "string",
                    "description": "Shell command or script. Each stdout line is an event; exit ends the watch. Filter with `grep --line-buffered` (or equivalent) so only the lines you'd act on become events — including failure signatures, not just success."
                },
                "description": {
                    "type": "string",
                    "description": "Short human-readable description of what you are monitoring (shown in every notification). Be specific — \"errors in deploy.log\" beats \"watching logs\"."
                },
                "persistent": {
                    "type": "boolean",
                    "default": false,
                    "description": "Run for the lifetime of the session (no timeout). Use for session-length watches like PR monitoring or log tails."
                },
                "timeout_ms": {
                    "type": "number",
                    "minimum": 1,
                    "maximum": MAX_MONITOR_TIMEOUT_MS,
                    "default": 300000,
                    "description": "Kill the monitor after this deadline. Default 300000ms, max 3600000ms. Ignored when persistent is true."
                }
            },
            "required": ["command", "description"],
            "additionalProperties": false
        }),
        serde_json::json!({ "type": "object", "additionalProperties": true }),
    )
    .with_examples(vec![
        r#"monitor(command="tail -f /var/log/app.log | grep -E --line-buffered 'ERROR|Traceback|FAILED'", description="errors in app.log")"#.into(),
        r#"monitor(command="while true; do curl -sf http://localhost:3000/health && echo ready && break; sleep 2; done", description="local server ready", timeout_ms=300000)"#.into(),
    ])
    .with_execution_mode(ToolExecutionMode::Parallel)
}

pub fn tasks_list_tool_definition() -> ToolDefinition {
    ToolDefinition::raw(
        "tasks_list",
        "List every background task registered for this session — monitors and subagents — with their `task_id`, kind, label, and state. `state` is one of `running`, `idle`, `completed`, `failed`, or `cancelled`. Use this to see what's still running before deciding whether to keep waiting, poll again, or stop something.",
        ToolDefinition::default_input_schema(),
        serde_json::json!({ "type": "object", "additionalProperties": true }),
    )
    .with_examples(vec!["tasks_list()".into()])
    .with_execution_mode(ToolExecutionMode::Parallel)
}

fn task_control_tool_definitions() -> Vec<ToolDefinition> {
    vec![tasks_list_tool_definition(), tasks_stop_tool_definition()]
}

pub fn tasks_stop_tool_definition() -> ToolDefinition {
    ToolDefinition::raw(
        "tasks_stop",
        "Cancel a background task by `task_id`. Monitors and subagents return this id when started; `tasks_list` can also rediscover it. For monitors this terminates the process tree; for subagents it closes the subtree (running turns are cancelled, idle sessions closed).",
        serde_json::json!({
            "type": "object",
            "properties": {
                "task_id": {
                    "type": "string",
                    "description": "Task id returned by `monitor`, `spawn_agent`, or `tasks_list`."
                }
            },
            "required": ["task_id"],
            "additionalProperties": false
        }),
        serde_json::json!({ "type": "object", "additionalProperties": true }),
    )
    .with_examples(vec![
        r#"tasks_stop(task_id="monitor:app-errors")"#.into(),
        r#"tasks_stop(task_id="subagent:/root/inspect_auth")"#.into(),
    ])
    .with_execution_mode(ToolExecutionMode::Parallel)
}

/// Parsed `monitor` arguments ready to hand to the session manager.
pub struct MonitorToolSpec {
    pub command: String,
    pub description: String,
    pub persistent: bool,
    pub timeout_ms: u64,
}

impl MonitorToolSpec {
    #[allow(clippy::result_large_err)]
    pub fn from_args(args: &Value) -> Result<Self, ToolResult> {
        let command = args
            .get("command")
            .and_then(|value| value.as_str())
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .ok_or_else(|| ToolResult::err_fmt("monitor requires `command`"))?;
        let description = args
            .get("description")
            .and_then(|value| value.as_str())
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .ok_or_else(|| ToolResult::err_fmt("monitor requires `description`"))?;
        let persistent = args
            .get("persistent")
            .and_then(|value| value.as_bool())
            .unwrap_or(false);
        let timeout_ms = args
            .get("timeout_ms")
            .and_then(|value| value.as_u64())
            .unwrap_or(300_000);
        if timeout_ms > MAX_MONITOR_TIMEOUT_MS {
            return Err(ToolResult::err_fmt(format_args!(
                "monitor timeout_ms must be <= {MAX_MONITOR_TIMEOUT_MS}"
            )));
        }
        Ok(Self {
            command: command.to_string(),
            description: description.to_string(),
            persistent,
            timeout_ms,
        })
    }
}

pub async fn execute_monitor_tool_call(
    context: &ToolDispatchContext,
    spec: MonitorToolSpec,
) -> ToolResult {
    let id = uuid::Uuid::new_v4().simple().to_string();
    let monitor_spec = MonitorSpec {
        id: id.clone(),
        command: spec.command,
        cwd: None,
        env: Default::default(),
        persistent: spec.persistent,
        timeout_ms: spec.timeout_ms,
        arm_on: Default::default(),
        wake_policy: Default::default(),
        restart_on_restore: spec.persistent,
    };
    match context
        .host
        .start_monitor(&context.session_id, monitor_spec)
        .await
    {
        Ok(snapshot) => {
            let started = snapshot
                .monitors
                .iter()
                .find(|status| status.spec.id == id)
                .cloned();
            match started {
                Some(status) => ToolResult::ok(serde_json::json!({
                    "task_id": format!("monitor:{}", status.spec.id),
                    "monitor_id": status.spec.id,
                    "description": status.spec.command,
                    "command": status.spec.command,
                    "persistent": status.spec.persistent,
                    "timeout_ms": status.spec.timeout_ms,
                    "state": match status.state {
                        MonitorRunState::Idle => "idle",
                        MonitorRunState::Running => "running",
                        MonitorRunState::Stopped => "stopped",
                        MonitorRunState::Exited => "exited",
                        MonitorRunState::Failed => "failed",
                    },
                })),
                None => ToolResult::err_fmt("monitor started but status was unavailable"),
            }
        }
        Err(err) => ToolResult::err_fmt(err.to_string()),
    }
}

pub async fn execute_tasks_list_tool_call(context: &ToolDispatchContext) -> ToolResult {
    match context
        .host
        .list_background_tasks(&context.session_id)
        .await
    {
        Ok(tasks) => {
            let entries: Vec<Value> = tasks
                .into_iter()
                .map(|task| {
                    let created_at_iso = chrono::DateTime::<chrono::Utc>::from(task.created_at)
                        .to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
                    serde_json::json!({
                        "task_id": task.id,
                        "kind": kind_label(task.kind),
                        "producer": task.producer,
                        "state": state_label(task.state),
                        "created_at": created_at_iso,
                    })
                })
                .collect();
            ToolResult::ok(serde_json::json!({ "tasks": entries }))
        }
        Err(err) => ToolResult::err_fmt(err.to_string()),
    }
}

pub async fn execute_tasks_stop_tool_call(
    context: &ToolDispatchContext,
    args: &Value,
) -> ToolResult {
    let Some(id) = args
        .get("task_id")
        .and_then(|value| value.as_str())
        .map(str::trim)
        .filter(|value| !value.is_empty())
    else {
        return ToolResult::err_fmt("tasks_stop requires `task_id`");
    };
    match context
        .host
        .cancel_background_task(&context.session_id, id)
        .await
    {
        Ok(status) => ToolResult::ok(serde_json::json!({
            "task_id": status.id,
            "kind": kind_label(status.kind),
            "state": state_label(status.state),
        })),
        Err(err) => ToolResult::err_fmt(err.to_string()),
    }
}

fn state_label(state: BackgroundTaskState) -> &'static str {
    match state {
        BackgroundTaskState::Pending => "pending",
        BackgroundTaskState::Running => "running",
        BackgroundTaskState::Waiting => "idle",
        BackgroundTaskState::Completed => "completed",
        BackgroundTaskState::Failed => "failed",
        BackgroundTaskState::CancelRequested => "cancel_requested",
        BackgroundTaskState::Cancelled => "cancelled",
    }
}

fn kind_label(kind: BackgroundTaskKind) -> &'static str {
    kind.as_str()
}