lash-core 0.1.0-alpha.1

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use serde_json::json;

use super::async_handles::{
    AsyncToolHandleEntry, AsyncToolHandleMetadata, AsyncToolHandleNamespace, AsyncToolTerminal,
};
use super::execution_context::ModeExecutionContext;
use super::tool_execution::ModeToolReply;
use crate::ToolCallRecord;
use crate::tool_dispatch::ToolDispatchOutcome;

impl ModeExecutionContext {
    pub(super) async fn live_monitor_tasks(&self) -> Vec<crate::BackgroundTaskRecord> {
        self.dispatch
            .host
            .list_background_tasks(&self.session_id)
            .await
            .unwrap_or_default()
            .into_iter()
            .filter(|task| {
                task.kind == crate::BackgroundTaskKind::Monitor && !task.state.is_terminal()
            })
            .collect()
    }

    pub(super) fn background_task_status_value(
        status: &crate::BackgroundTaskRecord,
    ) -> serde_json::Value {
        json!({
            "task_id": status.id,
            "kind": status.kind.as_str(),
            "producer": status.producer,
            "state": match status.state {
                crate::BackgroundTaskState::Pending => "pending",
                crate::BackgroundTaskState::Running => "running",
                crate::BackgroundTaskState::Waiting => "idle",
                crate::BackgroundTaskState::Completed => "completed",
                crate::BackgroundTaskState::Failed => "failed",
                crate::BackgroundTaskState::CancelRequested => "cancel_requested",
                crate::BackgroundTaskState::Cancelled => "cancelled",
            },
        })
    }

    pub(super) fn monitor_handle_identifier(task_id: &str) -> String {
        task_id
            .strip_prefix("monitor:")
            .unwrap_or(task_id)
            .to_string()
    }

    pub(super) fn ensure_monitor_async_handle(
        &self,
        status: &crate::BackgroundTaskRecord,
    ) -> serde_json::Value {
        let mut handles = self
            .async_tool_handles
            .lock()
            .expect("async tool handle map lock");
        handles.entry(status.id.clone()).or_insert_with(|| {
            AsyncToolHandleEntry::empty_monitor(AsyncToolHandleMetadata {
                tool_name: "monitor".to_string(),
                namespace: AsyncToolHandleNamespace::Monitor,
                identifier: Self::monitor_handle_identifier(&status.id),
            })
        });
        Self::async_tool_handle_value(&status.id, "monitor")
    }

    pub(super) async fn start_monitor_handle_call(
        &self,
        call_id: String,
        args: serde_json::Value,
        tc_num: usize,
    ) -> ModeToolReply {
        let executed = self
            .execute_tool_call(call_id, "monitor".to_string(), args, tc_num, None)
            .await;

        let reply = if executed.completed.output.is_success() {
            let task_id = executed
                .completed
                .output
                .value_for_projection()
                .get("task_id")
                .and_then(|value| value.as_str())
                .map(str::to_string);
            match task_id {
                Some(task_id) => {
                    let status = crate::BackgroundTaskRecord::local_session(
                        self.session_id.clone(),
                        task_id.clone(),
                        crate::BackgroundTaskKind::Monitor,
                        "monitor",
                        crate::BackgroundTaskState::Running,
                    );
                    ModeToolReply::success(self.ensure_monitor_async_handle(&status))
                }
                None => ModeToolReply::error(json!("monitor started but did not return a task_id")),
            }
        } else {
            ModeToolReply::from_output(executed.completed.output.clone())
        };

        reply.with_record(executed.record)
    }

    pub(super) fn list_async_handles(
        &self,
        live_monitor_tasks: Vec<crate::BackgroundTaskRecord>,
    ) -> ModeToolReply {
        for task in &live_monitor_tasks {
            self.ensure_monitor_async_handle(task);
        }

        let entries = self
            .async_tool_handles
            .lock()
            .expect("async tool handle map lock")
            .iter()
            .filter_map(|(id, entry)| {
                if entry.metadata.namespace == AsyncToolHandleNamespace::Monitor {
                    return None;
                }
                let is_terminal = entry
                    .state
                    .lock()
                    .expect("async tool state lock")
                    .terminal
                    .is_some();
                (!is_terminal).then(|| (id.clone(), entry.metadata.clone()))
            })
            .collect::<Vec<_>>();

        let mut monitor = serde_json::Map::new();
        let mut subagent = serde_json::Map::new();
        let mut tool = serde_json::Map::new();
        for (id, metadata) in entries {
            let value = Self::async_tool_handle_value(&id, &metadata.tool_name);
            match metadata.namespace {
                AsyncToolHandleNamespace::Monitor => {
                    monitor.insert(metadata.identifier, value);
                }
                AsyncToolHandleNamespace::Subagent => {
                    subagent.insert(metadata.identifier, value);
                }
                AsyncToolHandleNamespace::Tool => {
                    tool.insert(metadata.identifier, value);
                }
            }
        }
        for task in live_monitor_tasks {
            monitor.insert(
                Self::monitor_handle_identifier(&task.id),
                Self::async_tool_handle_value(&task.id, "monitor"),
            );
        }
        ModeToolReply::success(json!({
            "monitor": monitor,
            "subagent": subagent,
            "tool": tool,
        }))
    }

    pub(super) async fn await_monitor_handle(&self, task_id: &str) -> ModeToolReply {
        loop {
            let tasks = match self
                .dispatch
                .host
                .list_background_tasks(&self.session_id)
                .await
            {
                Ok(tasks) => tasks,
                Err(err) => return ModeToolReply::error(json!(err.to_string())),
            };
            let Some(status) = tasks.into_iter().find(|task| task.id == task_id) else {
                return ModeToolReply::error(json!(format!("Unknown monitor handle: {task_id}")));
            };
            if status.state.is_terminal() {
                if let Some(entry) = self
                    .async_tool_handles
                    .lock()
                    .ok()
                    .and_then(|handles| handles.get(task_id).cloned())
                {
                    let mut guard = entry.state.lock().expect("async tool state lock");
                    if guard.terminal.is_none() {
                        guard.terminal = Some(match status.state {
                            crate::BackgroundTaskState::Cancelled => AsyncToolTerminal::Cancelled,
                            crate::BackgroundTaskState::Failed => {
                                AsyncToolTerminal::Failed("monitor failed".to_string())
                            }
                            _ => AsyncToolTerminal::Completed(ToolDispatchOutcome {
                                record: ToolCallRecord {
                                    call_id: None,
                                    tool: "monitor".into(),
                                    args: json!({}),
                                    output: crate::ToolCallOutput::success(
                                        Self::background_task_status_value(&status),
                                    ),
                                    duration_ms: 0,
                                },
                            }),
                        });
                    }
                }
                let value = Self::background_task_status_value(&status);
                return match status.state {
                    crate::BackgroundTaskState::Failed => ModeToolReply::error(value),
                    crate::BackgroundTaskState::Cancelled => {
                        ModeToolReply::cancelled("monitor was cancelled")
                    }
                    _ => ModeToolReply::success(value),
                };
            }
            tokio::time::sleep(std::time::Duration::from_millis(250)).await;
        }
    }

    pub(super) async fn cancel_monitor_handle(&self, task_id: &str) -> ModeToolReply {
        match self
            .dispatch
            .host
            .cancel_background_task(&self.session_id, task_id)
            .await
        {
            Ok(status) => {
                if let Some(entry) = self
                    .async_tool_handles
                    .lock()
                    .ok()
                    .and_then(|handles| handles.get(task_id).cloned())
                {
                    let mut guard = entry.state.lock().expect("async tool state lock");
                    guard.terminal = Some(AsyncToolTerminal::Cancelled);
                    entry.progress_notify.notify_waiters();
                    entry.done_notify.notify_waiters();
                }
                ModeToolReply::success(Self::background_task_status_value(&status))
            }
            Err(err) => ModeToolReply::error(json!(err.to_string())),
        }
    }
}