use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock, Weak};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use serde::Deserialize;
use serde_json::{json, Value};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::{mpsc, Mutex, Semaphore};
use tracing::{info, warn};
use uuid::Uuid;
use super::process_control::{configure_command_for_process_group, send_sigkill, send_sigterm};
use super::{
command_risk::{PermissionMode, RiskLevel},
daemon_guard::detect_daemonization_primitives,
};
use crate::agent::{
derive_executor_step_result, persist_executor_handoff_context, persist_executor_result_context,
ExecutorHandoff, TargetScope, TaskValidationOutcome,
};
use crate::channels::ChannelHub;
use crate::config::CliAgentsConfig;
use crate::llm_runtime::SharedLlmRuntime;
use crate::tools::terminal::ApprovalRequest;
use crate::tools::ApprovalBroker;
use crate::traits::{
DynamicCliAgent, ModelProvider, StateStore, Tool, ToolCapabilities, ToolTargetHint,
ToolTargetHintKind,
};
use crate::types::ApprovalResponse;
use crate::types::StatusUpdate;
use crate::utils::{truncate_str, truncate_with_note};
const BUFFER_CAP: usize = 1_048_576;
const PROGRESS_INTERVAL: Duration = Duration::from_secs(2);
const LOOP_DETECTION_WINDOW: usize = 100;
const LOOP_DETECTION_THRESHOLD: usize = 50;
const DEFAULT_MAX_CONCURRENT: usize = 3;
const MAX_PROMPT_SIZE: usize = 16384;
const MAX_DIFF_SIZE: usize = 4096;
struct LoopDetector {
recent_lines: Vec<u64>, line_counts: HashMap<u64, usize>,
}
impl LoopDetector {
fn new() -> Self {
Self {
recent_lines: Vec::with_capacity(LOOP_DETECTION_WINDOW),
line_counts: HashMap::new(),
}
}
fn add_line(&mut self, line: &str) -> bool {
let normalized = line.trim();
if normalized.is_empty() {
return false; }
let mut hasher = DefaultHasher::new();
normalized.hash(&mut hasher);
let hash = hasher.finish();
self.recent_lines.push(hash);
*self.line_counts.entry(hash).or_insert(0) += 1;
if self.recent_lines.len() > LOOP_DETECTION_WINDOW {
let old_hash = self.recent_lines.remove(0);
if let Some(count) = self.line_counts.get_mut(&old_hash) {
*count -= 1;
if *count == 0 {
self.line_counts.remove(&old_hash);
}
}
}
self.line_counts
.values()
.any(|&count| count >= LOOP_DETECTION_THRESHOLD)
}
fn get_loop_pattern(&self) -> Option<usize> {
self.line_counts.values().max().copied()
}
}
#[cfg(unix)]
fn is_process_alive(pid: u32) -> bool {
use std::process::Command;
Command::new("kill")
.args(["-0", &pid.to_string()])
.status()
.map(|s| s.success())
.unwrap_or(false)
}
#[cfg(not(unix))]
fn is_process_alive(_pid: u32) -> bool {
true
}
#[cfg(unix)]
async fn kill_process(pid: u32) {
if pid == 0 {
return;
}
let _ = send_sigterm(pid);
tokio::time::sleep(Duration::from_secs(2)).await;
if is_process_alive(pid) {
let _ = send_sigkill(pid);
}
}
#[cfg(not(unix))]
async fn kill_process(_pid: u32) {
}
struct CliToolEntry {
command: String,
args: Vec<String>,
description: String,
timeout: Duration,
max_output_chars: usize,
is_dynamic: bool,
}
struct RunningCliAgent {
tool_name: String,
prompt_summary: String,
started_at: Instant,
display_buf: Arc<Mutex<String>>,
stdout_buf: Arc<Mutex<String>>,
child_id: u32,
session_id: String,
delegated_task_id: Option<String>,
working_dir: Option<String>,
}
struct CompletedCliAgent {
result: String,
completed_at: Instant,
}
struct WorkingDirClaim {
task_id: String,
tool_name: String,
prompt_summary: String,
dedup_prompt: String,
}
type WorkingDirClaims = std::sync::Mutex<HashMap<String, WorkingDirClaim>>;
fn lock_claims(
claims: &WorkingDirClaims,
) -> std::sync::MutexGuard<'_, HashMap<String, WorkingDirClaim>> {
claims
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
struct WorkingDirClaimGuard {
claims: Arc<WorkingDirClaims>,
dir: String,
task_id: String,
armed: bool,
}
impl WorkingDirClaimGuard {
fn new(claims: Arc<WorkingDirClaims>, dir: String, task_id: String) -> Self {
Self {
claims,
dir,
task_id,
armed: true,
}
}
fn disarm(&mut self) {
self.armed = false;
}
}
impl Drop for WorkingDirClaimGuard {
fn drop(&mut self) {
if !self.armed {
return;
}
let mut claims = lock_claims(&self.claims);
let owned = claims
.get(&self.dir)
.map(|claim| claim.task_id == self.task_id)
.unwrap_or(false);
if owned {
claims.remove(&self.dir);
}
}
}
fn prompt_similarity(a: &str, b: &str) -> f64 {
fn bigrams(s: &str) -> HashSet<(String, String)> {
let words: Vec<&str> = s.split_whitespace().collect();
if words.len() < 2 {
return words
.into_iter()
.map(|w| (w.to_string(), String::new()))
.collect();
}
words
.windows(2)
.map(|w| (w[0].to_string(), w[1].to_string()))
.collect()
}
let set_a = bigrams(&a.to_lowercase());
let set_b = bigrams(&b.to_lowercase());
if set_a.is_empty() && set_b.is_empty() {
return 1.0; }
if set_a.is_empty() || set_b.is_empty() {
return 0.0;
}
let intersection = set_a.intersection(&set_b).count();
let union = set_a.union(&set_b).count();
if union == 0 {
0.0
} else {
intersection as f64 / union as f64
}
}
fn make_dedup_prompt(prompt: &str) -> String {
prompt.chars().take(200).collect::<String>().to_lowercase()
}
pub struct CliAgentTool {
tools: Arc<std::sync::RwLock<HashMap<String, CliToolEntry>>>,
tool_names: Arc<std::sync::RwLock<Vec<String>>>,
running: Arc<Mutex<HashMap<String, RunningCliAgent>>>, completed: Arc<Mutex<HashMap<String, CompletedCliAgent>>>, working_dir_claims: Arc<WorkingDirClaims>, state: Arc<dyn StateStore>,
#[allow(dead_code)] llm_runtime: SharedLlmRuntime,
default_timeout: Duration,
default_max_output: usize,
max_concurrent: usize,
concurrency_limiter: Arc<Semaphore>,
approval_tx: ApprovalBroker,
hub: OnceLock<Weak<ChannelHub>>,
}
fn default_tool_definitions() -> Vec<(&'static str, &'static str, Vec<&'static str>, &'static str)>
{
vec![
(
"claude",
"claude",
vec![
"-p",
"--dangerously-skip-permissions",
"--output-format",
"stream-json",
"--verbose",
],
"Claude Code — Anthropic's AI coding agent (auto-approve mode)",
),
(
"gemini",
"gemini",
vec![
"--sandbox=false",
"--yolo",
"--output-format",
"stream-json",
],
"Gemini CLI — Google's AI coding agent (auto-approve mode)",
),
(
"codex",
"codex",
vec![
"exec",
"--json",
"--dangerously-bypass-approvals-and-sandbox",
],
"Codex CLI — OpenAI's AI coding agent (auto-approve mode)",
),
(
"copilot",
"copilot",
vec!["-p", "--allow-all-tools", "--allow-all-paths"],
"GitHub Copilot CLI (auto-approve mode)",
),
(
"aider",
"aider",
vec!["--yes", "--message"],
"Aider — AI pair programming",
),
]
}
const DEFAULT_TOOL_PRIORITY: &[&str] = &["claude", "gemini", "codex", "copilot", "aider"];
async fn command_exists(command: &str) -> bool {
tokio::process::Command::new("which")
.arg(command)
.output()
.await
.map(|o| o.status.success())
.unwrap_or(false)
}
impl CliAgentTool {
async fn persist_delegated_cli_result_with_state(
state: Arc<dyn StateStore>,
delegated_task_id: &str,
response: Option<&str>,
error: Option<&str>,
) {
let latest_task = state.get_task(delegated_task_id).await.ok().flatten();
let structured =
derive_executor_step_result(delegated_task_id, latest_task.as_ref(), response, error);
let task_lead_summary = structured.render_task_lead_summary();
if let Ok(Some(mut task)) = state.get_task(delegated_task_id).await {
if let Ok(context) =
persist_executor_result_context(task.context.as_deref(), &structured)
{
task.context = Some(context);
}
match error {
Some(error) => {
task.status = "failed".to_string();
task.error = Some(error.to_string());
task.completed_at = Some(chrono::Utc::now().to_rfc3339());
}
None => {
if matches!(structured.task_outcome, TaskValidationOutcome::TaskDone) {
if task
.result
.as_deref()
.is_none_or(|result| result.trim().is_empty())
{
if let Some(response) = response {
task.result = Some(response.to_string());
} else {
task.result = Some(structured.summary.clone());
}
}
task.status = "completed".to_string();
task.blocker = None;
} else {
task.result = Some(task_lead_summary.clone());
task.status = "blocked".to_string();
task.blocker = structured
.blocker
.clone()
.or_else(|| structured.exact_need.clone())
.or_else(|| Some(structured.summary.clone()));
}
task.completed_at = Some(chrono::Utc::now().to_rfc3339());
}
}
let _ = state.update_task(&task).await;
}
}
fn prune_completed_map(completed: &mut HashMap<String, CompletedCliAgent>) {
const COMPLETED_TTL: Duration = Duration::from_secs(10 * 60);
const COMPLETED_CAP: usize = 128;
completed.retain(|_, entry| entry.completed_at.elapsed() <= COMPLETED_TTL);
if completed.len() <= COMPLETED_CAP {
return;
}
let mut by_age: Vec<(String, Instant)> = completed
.iter()
.map(|(task_id, entry)| (task_id.clone(), entry.completed_at))
.collect();
by_age.sort_by_key(|(_, ts)| *ts);
let to_remove = by_age.len().saturating_sub(COMPLETED_CAP);
for (task_id, _) in by_age.into_iter().take(to_remove) {
completed.remove(&task_id);
}
}
async fn build_finished_result(agent: &RunningCliAgent) -> String {
let elapsed = agent.started_at.elapsed().as_secs();
let stdout_output = agent.stdout_buf.lock().await.clone();
let result = extract_meaningful_output(&stdout_output, 10000);
let diff_section = if let Some(ref dir) = agent.working_dir {
Self::capture_git_diff(dir)
.await
.map(|diff| format!("\n\n## File Changes\n```diff\n{}\n```", diff))
} else {
None
};
let mut final_result = format!(
"CLI agent '{}' finished after {}s.\n\nResult:\n{}",
agent.tool_name, elapsed, result
);
if let Some(diff) = diff_section {
final_result.push_str(&diff);
}
final_result
}
fn tool_priority(name: &str) -> usize {
let lower = name.to_ascii_lowercase();
DEFAULT_TOOL_PRIORITY
.iter()
.position(|known| *known == lower)
.unwrap_or(DEFAULT_TOOL_PRIORITY.len())
}
fn default_tool_name(&self) -> Option<String> {
let tools = self.tools.read().unwrap();
if tools.is_empty() {
return None;
}
let mut names: Vec<String> = tools.keys().cloned().collect();
names.sort_by(|a, b| {
Self::tool_priority(a)
.cmp(&Self::tool_priority(b))
.then_with(|| a.cmp(b))
});
names.into_iter().next()
}
fn is_owner_role(user_role: Option<&str>) -> bool {
user_role.is_some_and(|role| role.eq_ignore_ascii_case("owner"))
}
fn normalize_working_dir(dir: &str) -> String {
let expanded = shellexpand::tilde(dir).to_string();
std::fs::canonicalize(&expanded)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or(expanded)
}
fn release_working_dir_claim(&self, dir: &str, task_id: &str) {
let mut claims = lock_claims(&self.working_dir_claims);
let should_remove = claims
.get(dir)
.map(|claim| claim.task_id == task_id)
.unwrap_or(false);
if should_remove {
claims.remove(dir);
}
}
async fn request_daemonization_approval(
&self,
session_id: &str,
tool_name: &str,
prompt: &str,
hits: &[&str],
) -> anyhow::Result<ApprovalResponse> {
let prompt_preview: String = prompt.chars().take(180).collect();
let command = format!(
"cli_agent '{}' requested detached/background execution markers: {}. Prompt preview: {}",
tool_name,
hits.join(", "),
prompt_preview
);
let warnings = vec![
format!("Daemonization primitives detected: {}", hits.join(", ")),
"Detached/background processes may survive cancellation and continue running."
.to_string(),
];
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
self.approval_tx
.send(ApprovalRequest {
command,
session_id: session_id.to_string(),
risk_level: RiskLevel::Critical,
warnings,
permission_mode: PermissionMode::Default,
response_tx,
kind: Default::default(),
})
.await
.map_err(|_| anyhow::anyhow!("Approval channel closed"))?;
match tokio::time::timeout(std::time::Duration::from_secs(300), response_rx).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(_)) => Ok(ApprovalResponse::Deny),
Err(_) => Ok(ApprovalResponse::Deny),
}
}
pub async fn discover(
config: CliAgentsConfig,
state: Arc<dyn StateStore>,
llm_runtime: SharedLlmRuntime,
approval_tx: ApprovalBroker,
) -> Self {
let default_timeout = Duration::from_secs(config.timeout_secs);
let default_max_output = config.max_output_chars;
type ToolCandidate = (
String,
String,
Vec<String>,
String,
Option<u64>,
Option<usize>,
);
let mut candidates: Vec<ToolCandidate> = Vec::new();
if config.tools.is_empty() {
for (name, cmd, args, desc) in default_tool_definitions() {
candidates.push((
name.to_string(),
cmd.to_string(),
args.into_iter().map(|s| s.to_string()).collect(),
desc.to_string(),
None,
None,
));
}
} else {
for (name, tool_cfg) in &config.tools {
candidates.push((
name.clone(),
tool_cfg.command.clone(),
tool_cfg.args.clone(),
tool_cfg.description.clone(),
tool_cfg.timeout_secs,
tool_cfg.max_output_chars,
));
}
}
let which_futures: Vec<_> = candidates
.iter()
.map(|(_, command, _, _, _, _)| command_exists(command))
.collect();
let which_results = futures::future::join_all(which_futures).await;
let mut tools = HashMap::new();
for (i, (name, command, args, description, timeout_override, max_output_override)) in
candidates.into_iter().enumerate()
{
if which_results[i] {
info!(name = %name, command = %command, "CLI agent tool discovered");
tools.insert(
name,
CliToolEntry {
command,
args,
description,
timeout: timeout_override
.map(Duration::from_secs)
.unwrap_or(default_timeout),
max_output_chars: max_output_override.unwrap_or(default_max_output),
is_dynamic: false,
},
);
} else {
info!(name = %name, command = %command, "CLI agent tool not found, skipping");
}
}
let mut tool_names: Vec<String> = tools.keys().cloned().collect();
tool_names.sort();
let tool = CliAgentTool {
tools: Arc::new(std::sync::RwLock::new(tools)),
tool_names: Arc::new(std::sync::RwLock::new(tool_names)),
running: Arc::new(Mutex::new(HashMap::new())),
completed: Arc::new(Mutex::new(HashMap::new())),
working_dir_claims: Arc::new(std::sync::Mutex::new(HashMap::new())),
state,
llm_runtime,
default_timeout,
default_max_output,
max_concurrent: DEFAULT_MAX_CONCURRENT,
concurrency_limiter: Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT)),
approval_tx,
hub: OnceLock::new(),
};
tool.load_dynamic_agents().await;
tool
}
pub fn set_hub(&self, hub: Weak<ChannelHub>) {
let _ = self.hub.set(hub);
}
fn get_hub(&self) -> Option<Arc<ChannelHub>> {
self.hub.get().and_then(|w| w.upgrade())
}
async fn load_dynamic_agents(&self) {
match self.state.list_dynamic_cli_agents().await {
Ok(agents) => {
for agent in agents {
if !agent.enabled {
continue;
}
if !command_exists(&agent.command).await {
info!(name = %agent.name, command = %agent.command, "Dynamic CLI agent command not found, skipping");
continue;
}
let args: Vec<String> =
serde_json::from_str(&agent.args_json).unwrap_or_default();
let entry = CliToolEntry {
command: agent.command.clone(),
args,
description: agent.description.clone(),
timeout: agent
.timeout_secs
.map(Duration::from_secs)
.unwrap_or(self.default_timeout),
max_output_chars: agent.max_output_chars.unwrap_or(self.default_max_output),
is_dynamic: true,
};
let mut tools = self.tools.write().unwrap();
tools.insert(agent.name.clone(), entry);
let mut names = self.tool_names.write().unwrap();
if !names.contains(&agent.name) {
names.push(agent.name.clone());
names.sort();
}
info!(name = %agent.name, "Loaded dynamic CLI agent from DB");
}
}
Err(e) => {
warn!("Failed to load dynamic CLI agents: {}", e);
}
}
}
pub fn has_tools(&self) -> bool {
!self.tools.read().unwrap().is_empty()
}
pub async fn add_agent(
&self,
name: &str,
command: &str,
args: Vec<String>,
description: &str,
timeout_secs: Option<u64>,
max_output_chars: Option<usize>,
) -> anyhow::Result<String> {
if !command_exists(command).await {
return Ok(format!(
"Command '{}' not found on this system. Install it first.",
command
));
}
let dynamic = DynamicCliAgent {
id: 0,
name: name.to_string(),
command: command.to_string(),
args_json: serde_json::to_string(&args)?,
description: description.to_string(),
timeout_secs,
max_output_chars,
enabled: true,
created_at: String::new(),
};
self.state.save_dynamic_cli_agent(&dynamic).await?;
let entry = CliToolEntry {
command: command.to_string(),
args,
description: description.to_string(),
timeout: timeout_secs
.map(Duration::from_secs)
.unwrap_or(self.default_timeout),
max_output_chars: max_output_chars.unwrap_or(self.default_max_output),
is_dynamic: true,
};
let mut tools = self.tools.write().unwrap();
tools.insert(name.to_string(), entry);
let mut names = self.tool_names.write().unwrap();
if !names.contains(&name.to_string()) {
names.push(name.to_string());
names.sort();
}
Ok(format!("CLI agent '{}' added successfully.", name))
}
pub async fn remove_agent(&self, name: &str) -> anyhow::Result<String> {
let agents = self.state.list_dynamic_cli_agents().await?;
if let Some(agent) = agents.iter().find(|a| a.name == name) {
self.state.delete_dynamic_cli_agent(agent.id).await?;
}
let mut tools = self.tools.write().unwrap();
if tools.remove(name).is_some() {
let mut names = self.tool_names.write().unwrap();
names.retain(|n| n != name);
Ok(format!("CLI agent '{}' removed.", name))
} else {
Ok(format!("CLI agent '{}' not found.", name))
}
}
pub async fn enable_agent(&self, name: &str, enabled: bool) -> anyhow::Result<String> {
let agents = self.state.list_dynamic_cli_agents().await?;
if let Some(mut agent) = agents.into_iter().find(|a| a.name == name) {
agent.enabled = enabled;
self.state.update_dynamic_cli_agent(&agent).await?;
if enabled {
if command_exists(&agent.command).await {
let args: Vec<String> =
serde_json::from_str(&agent.args_json).unwrap_or_default();
let entry = CliToolEntry {
command: agent.command.clone(),
args,
description: agent.description.clone(),
timeout: agent
.timeout_secs
.map(Duration::from_secs)
.unwrap_or(self.default_timeout),
max_output_chars: agent.max_output_chars.unwrap_or(self.default_max_output),
is_dynamic: true,
};
let mut tools = self.tools.write().unwrap();
tools.insert(name.to_string(), entry);
let mut names = self.tool_names.write().unwrap();
if !names.contains(&name.to_string()) {
names.push(name.to_string());
names.sort();
}
}
} else {
let mut tools = self.tools.write().unwrap();
tools.remove(name);
let mut names = self.tool_names.write().unwrap();
names.retain(|n| n != name);
}
let action = if enabled { "enabled" } else { "disabled" };
Ok(format!("CLI agent '{}' {}.", name, action))
} else {
let tools = self.tools.read().unwrap();
if tools.contains_key(name) {
Ok(format!(
"CLI agent '{}' is a discovered agent (not dynamic). Cannot toggle — it's always available while installed.",
name
))
} else {
Ok(format!("CLI agent '{}' not found.", name))
}
}
}
pub fn list_agents(&self) -> Vec<(String, String, String, bool)> {
let tools = self.tools.read().unwrap();
let mut result: Vec<(String, String, String, bool)> = tools
.iter()
.map(|(name, entry)| {
let source = if entry.is_dynamic {
"dynamic".to_string()
} else {
"discovered".to_string()
};
(
name.clone(),
entry.description.clone(),
source,
true, )
})
.collect();
result.sort_by(|a, b| a.0.cmp(&b.0));
result
}
async fn reap_finished(&self) {
let finished: Vec<(String, RunningCliAgent)> = {
let mut running = self.running.lock().await;
let finished_ids: Vec<String> = running
.iter()
.filter(|(_, agent)| !is_process_alive(agent.child_id))
.map(|(id, _)| id.clone())
.collect();
let mut claims = lock_claims(&self.working_dir_claims);
let mut removed: Vec<(String, RunningCliAgent)> = Vec::new();
for task_id in finished_ids {
if let Some(agent) = running.remove(&task_id) {
if let Some(ref dir) = agent.working_dir {
let should_remove = claims
.get(dir)
.map(|claim| claim.task_id == task_id)
.unwrap_or(false);
if should_remove {
claims.remove(dir);
}
}
removed.push((task_id, agent));
}
}
removed
};
for (task_id, agent) in finished {
let final_result = Self::build_finished_result(&agent).await;
let mut completed = self.completed.lock().await;
completed.insert(
task_id.clone(),
CompletedCliAgent {
result: final_result,
completed_at: Instant::now(),
},
);
Self::prune_completed_map(&mut completed);
info!(task_id, tool = %agent.tool_name, "Reaped finished CLI agent");
}
}
async fn build_enriched_prompt(
&self,
session_id: &str,
system_instruction: &str,
task_prompt: &str,
working_dir: Option<&str>,
) -> String {
let mut parts: Vec<String> = Vec::new();
if !system_instruction.trim().is_empty() {
parts.push(system_instruction.to_string());
}
parts.push(format!("## Task\n{}", task_prompt));
let budget = MAX_PROMPT_SIZE.saturating_sub(
parts.iter().map(|p| p.len()).sum::<usize>() + 200, );
let mut context_text = String::new();
if let Ok(history) = self.state.get_history(session_id, 10).await {
if !history.is_empty() {
let mut lines = Vec::new();
for msg in history.iter().rev().take(10) {
let role = &msg.role;
let content: String = msg
.content
.as_deref()
.unwrap_or("")
.chars()
.take(400)
.collect();
lines.push(format!("{}: {}", role, content));
}
context_text = lines.join("\n");
}
}
let mut facts_text = String::new();
{
let mut seen = HashSet::new();
let mut facts_accum = Vec::new();
if let Ok(facts) = self.state.get_relevant_facts(task_prompt, 15).await {
for fact in facts {
if seen.insert((fact.category.clone(), fact.key.clone())) {
facts_accum.push(fact);
}
}
}
if let Ok(history) = self.state.get_history(session_id, 10).await {
if let Some(last_user_msg) = history.iter().rev().find(|m| m.role == "user") {
if let Some(content) = last_user_msg.content.as_deref() {
let user_text: String = content.chars().take(500).collect();
if user_text != task_prompt {
if let Ok(facts) = self.state.get_relevant_facts(&user_text, 10).await {
for fact in facts {
if seen.insert((fact.category.clone(), fact.key.clone())) {
facts_accum.push(fact);
}
}
}
}
}
}
}
facts_accum.truncate(20);
if !facts_accum.is_empty() {
let fact_lines: Vec<String> = facts_accum
.iter()
.map(|f| format!("- {}: {}", f.key, f.value))
.collect();
facts_text = fact_lines.join("\n");
}
}
let mut active_goal_text = String::new();
if let Ok(goals) = self.state.get_goals_for_session(session_id).await {
if let Some(active_goal) = goals
.iter()
.find(|g| g.status == "active" || g.status == "in_progress")
{
active_goal_text = active_goal.description.chars().take(500).collect();
}
}
let mut project_docs_text = String::new();
if let Some(dir) = working_dir {
for doc_name in ["CLAUDE.md", "README.md"] {
let doc_path = std::path::Path::new(dir).join(doc_name);
if let Ok(content) = tokio::fs::read_to_string(&doc_path).await {
let mut snippet: String = content.chars().take(3072).collect();
if content.chars().count() > 3072 {
snippet.push_str("\n...[truncated]");
}
project_docs_text = format!("From {}:\n{}", doc_name, snippet);
break;
}
}
}
let mut files_text = String::new();
if let Some(dir) = working_dir {
let cap = 200usize;
let skip: HashSet<&str> = [
".git",
"node_modules",
"target",
"__pycache__",
".venv",
"dist",
"build",
]
.into_iter()
.collect();
let mut file_paths = Vec::new();
if let Ok(mut level1) = tokio::fs::read_dir(dir).await {
while let Ok(Some(entry1)) = level1.next_entry().await {
if file_paths.len() >= cap {
break;
}
let name1 = entry1.file_name().to_string_lossy().to_string();
if skip.contains(name1.as_str()) {
continue;
}
let Ok(ft1) = entry1.file_type().await else {
continue;
};
if ft1.is_file() {
file_paths.push(name1.clone());
continue;
}
if !ft1.is_dir() {
continue;
}
if let Ok(mut level2) = tokio::fs::read_dir(entry1.path()).await {
while let Ok(Some(entry2)) = level2.next_entry().await {
if file_paths.len() >= cap {
break;
}
let name2 = entry2.file_name().to_string_lossy().to_string();
if skip.contains(name2.as_str()) {
continue;
}
let rel2 = format!("{}/{}", name1, name2);
let Ok(ft2) = entry2.file_type().await else {
continue;
};
if ft2.is_file() {
file_paths.push(rel2);
continue;
}
if !ft2.is_dir() {
continue;
}
if let Ok(mut level3) = tokio::fs::read_dir(entry2.path()).await {
while let Ok(Some(entry3)) = level3.next_entry().await {
if file_paths.len() >= cap {
break;
}
let Ok(ft3) = entry3.file_type().await else {
continue;
};
if !ft3.is_file() {
continue;
}
let name3 = entry3.file_name().to_string_lossy().to_string();
if skip.contains(name3.as_str()) {
continue;
}
file_paths.push(format!("{}/{}/{}", name1, name2, name3));
}
}
}
}
}
}
if !file_paths.is_empty() {
file_paths.sort();
files_text = file_paths.join("\n");
}
}
let mut projects_listing_text = String::new();
if let Some(home) = dirs::home_dir() {
let projects_dir = home.join("projects");
if let Ok(mut entries) = tokio::fs::read_dir(&projects_dir).await {
let mut dirs_list: Vec<String> = Vec::new();
let mut files_list: Vec<String> = Vec::new();
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with('.') {
continue;
}
if let Ok(ft) = entry.file_type().await {
if ft.is_dir() {
dirs_list.push(format!(" {}/", name));
} else if ft.is_file() {
files_list.push(format!(" {}", name));
}
}
if dirs_list.len() + files_list.len() >= 80 {
break;
}
}
dirs_list.sort();
files_list.sort();
if !dirs_list.is_empty() || !files_list.is_empty() {
let mut listing = format!("{}:\n", projects_dir.display());
let mut all_entries = dirs_list;
all_entries.append(&mut files_list);
listing.push_str(&all_entries.join("\n"));
projects_listing_text = listing;
}
}
}
let total = active_goal_text.len()
+ project_docs_text.len()
+ facts_text.len()
+ files_text.len()
+ projects_listing_text.len()
+ context_text.len();
if total > budget {
let goal_budget = budget / 10;
let docs_budget = budget * 3 / 10;
let facts_budget = budget * 2 / 10;
let files_budget = budget * 2 / 10;
let projects_budget = budget / 10;
let context_budget = budget.saturating_sub(
goal_budget + docs_budget + facts_budget + files_budget + projects_budget,
);
if active_goal_text.len() > goal_budget {
active_goal_text = active_goal_text.chars().take(goal_budget).collect();
active_goal_text.push_str("...[truncated]");
}
if project_docs_text.len() > docs_budget {
project_docs_text = project_docs_text.chars().take(docs_budget).collect();
project_docs_text.push_str("...[truncated]");
}
if facts_text.len() > facts_budget {
facts_text = facts_text.chars().take(facts_budget).collect();
facts_text.push_str("...[truncated]");
}
if files_text.len() > files_budget {
files_text = files_text.chars().take(files_budget).collect();
files_text.push_str("...[truncated]");
}
if projects_listing_text.len() > projects_budget {
projects_listing_text = projects_listing_text
.chars()
.take(projects_budget)
.collect();
projects_listing_text.push_str("...[truncated]");
}
if context_text.len() > context_budget {
context_text = context_text.chars().take(context_budget).collect();
context_text.push_str("...[truncated]");
}
}
if !active_goal_text.is_empty() {
parts.push(format!("## Active Goal\n{}", active_goal_text));
}
if !project_docs_text.is_empty() {
parts.push(format!("## Project Documentation\n{}", project_docs_text));
}
if !facts_text.is_empty() {
parts.push(format!("## Known Facts\n{}", facts_text));
}
if !files_text.is_empty() {
parts.push(format!("## Project Files\n{}", files_text));
}
if !projects_listing_text.is_empty() {
parts.push(format!(
"## Available Project Directories\n{}",
projects_listing_text
));
}
if !context_text.is_empty() {
parts.push(format!("## Conversation Context\n{}", context_text));
}
parts.push(
"## Instructions\n\
- Focus exclusively on the task above\n\
- Do NOT attempt to directly inspect or modify aidaemon's state database (SQLite/SQLCipher). Do not run sqlite3/sqlcipher, do not install sqlcipher, and do not look for encryption keys. Use aidaemon tools/APIs instead.\n\
- Do NOT install system packages (brew/apt/dnf/pacman/pip) unless the user explicitly asked\n\
- Report what you did and what changed when done"
.to_string(),
);
parts.join("\n\n")
}
async fn capture_git_diff(working_dir: &str) -> Option<String> {
let git_check = tokio::process::Command::new("git")
.args(["rev-parse", "--git-dir"])
.current_dir(working_dir)
.output()
.await;
if !git_check.map(|o| o.status.success()).unwrap_or(false) {
return None;
}
let diff_stat = tokio::process::Command::new("git")
.args(["diff", "--stat"])
.current_dir(working_dir)
.output()
.await
.ok()?;
let stat_output = String::from_utf8_lossy(&diff_stat.stdout);
if !stat_output.trim().is_empty() {
let diff = tokio::process::Command::new("git")
.args(["diff"])
.current_dir(working_dir)
.output()
.await
.ok()?;
let diff_text = String::from_utf8_lossy(&diff.stdout);
if !diff_text.trim().is_empty() {
return Some(truncate_with_note(&diff_text, MAX_DIFF_SIZE));
}
}
let log = tokio::process::Command::new("git")
.args(["log", "-1", "--stat", "--format=%s"])
.current_dir(working_dir)
.output()
.await
.ok()?;
let log_output = String::from_utf8_lossy(&log.stdout);
if !log_output.trim().is_empty() {
let committed_diff = tokio::process::Command::new("git")
.args(["diff", "HEAD~1..HEAD"])
.current_dir(working_dir)
.output()
.await
.ok()?;
let committed_text = String::from_utf8_lossy(&committed_diff.stdout);
if !committed_text.trim().is_empty() {
return Some(format!(
"Committed: {}\n{}",
log_output.lines().next().unwrap_or(""),
truncate_with_note(&committed_text, MAX_DIFF_SIZE)
));
}
}
None
}
fn detect_auth_error(output: &str, tool_name: &str) -> Option<String> {
let auth_patterns = [
"authentication",
"unauthorized",
"expired",
"login required",
"api key",
"access denied",
"forbidden",
"invalid token",
];
let lower = output.to_lowercase();
for pattern in &auth_patterns {
if lower.contains(pattern) {
return Some(format!(
"CLI agent '{}' authentication failed. Check that your subscription/API key for {} is valid.",
tool_name, tool_name
));
}
}
None
}
#[allow(dead_code)]
async fn answer_cli_question(
provider: &Arc<dyn ModelProvider>,
task_context: &str,
recent_output: &str,
question: &str,
) -> Option<String> {
let lower = question.to_lowercase();
if lower.contains("password")
|| lower.contains("token")
|| lower.contains("api key")
|| lower.contains("secret")
|| lower.contains("credentials")
{
return None; }
let prompt = format!(
"You are answering on behalf of the user. Based on the task context, \
answer this question from a CLI agent. Be concise (1-2 sentences max).\n\n\
Task context: {}\n\n\
Recent agent output:\n{}\n\n\
Question: {}",
truncate_str(task_context, 500),
truncate_str(recent_output, 500),
question
);
let messages = vec![json!({
"role": "user",
"content": prompt
})];
let models = provider.list_models().await.unwrap_or_default();
let model = models.first().map(|m| m.as_str()).unwrap_or("default");
match provider.chat(model, &messages, &[]).await {
Ok(response) => {
let answer = response
.content
.as_ref()
.map(|c| c.trim().to_string())
.unwrap_or_else(|| "yes".to_string());
Some(answer)
}
Err(_) => {
if lower.contains("y/n")
|| lower.contains("yes/no")
|| lower.contains("confirm")
|| lower.ends_with("?")
{
Some("yes".to_string())
} else {
None
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_run(
&self,
tool_name: &str,
prompt: &str,
working_dir: Option<&str>,
session_id: &str,
goal_id: Option<&str>,
delegated_task_id: Option<&str>,
system_instruction: Option<&str>,
async_mode: bool,
status_tx: Option<mpsc::Sender<StatusUpdate>>,
) -> anyhow::Result<String> {
if let Some(ref tx) = status_tx {
let task_preview: String = prompt.chars().take(60).collect();
let task_desc = if prompt.len() > 60 {
format!("{}...", task_preview)
} else {
task_preview
};
let _ = tx.try_send(StatusUpdate::ToolProgress {
name: tool_name.to_string(),
chunk: format!("🚀 Delegating to {}: {}", tool_name, task_desc),
});
}
let (command, mut args, timeout, max_output) = {
let tools = self.tools.read().unwrap();
let entry = tools
.get(tool_name)
.ok_or_else(|| anyhow::anyhow!("Unknown CLI agent tool: {}", tool_name))?;
(
entry.command.clone(),
entry.args.clone(),
entry.timeout,
entry.max_output_chars,
)
};
if tool_name.eq_ignore_ascii_case("claude") {
let has_print = args.iter().any(|a| a == "-p" || a == "--print");
if !has_print {
args.push("--print".to_string());
}
let has_output_format = args.iter().any(|a| a == "--output-format" || a == "-o");
if !has_output_format {
args.push("--output-format".to_string());
args.push("stream-json".to_string());
}
let has_partial = args.iter().any(|a| a == "--include-partial-messages");
if !has_partial {
args.push("--include-partial-messages".to_string());
}
let has_verbose = args.iter().any(|a| a == "--verbose");
if !has_verbose {
args.push("--verbose".to_string());
}
}
if !command_exists(&command).await {
if let Ok(agents) = self.state.list_dynamic_cli_agents().await {
if let Some(mut agent) = agents.into_iter().find(|a| a.name == tool_name) {
agent.enabled = false;
let _ = self.state.update_dynamic_cli_agent(&agent).await;
}
}
let message = format!(
"CLI agent '{}' command not found. It may have been uninstalled. \
Use manage_cli_agents to remove it.",
tool_name
);
if let Some(task_id) = delegated_task_id {
Self::persist_delegated_cli_result_with_state(
self.state.clone(),
task_id,
None,
Some(&message),
)
.await;
}
return Ok(message);
}
let canonical_working_dir = working_dir.map(Self::normalize_working_dir);
let dedup_prompt = make_dedup_prompt(prompt);
let task_id = Uuid::new_v4().to_string()[..8].to_string();
let short_summary: String = prompt.chars().take(50).collect();
let mut claim_guard: Option<WorkingDirClaimGuard> = None;
if let Some(ref dir) = canonical_working_dir {
let blocked_message: Option<String> = {
let _running_guard = self.running.lock().await;
let mut claims = lock_claims(&self.working_dir_claims);
if let Some(claim) = claims.get(dir) {
let sim = prompt_similarity(&dedup_prompt, &claim.dedup_prompt);
if sim > 0.5 {
Some(format!(
"BLOCKED: A very similar task is already running in {} \
(task_id={}, agent={}, similarity={:.0}%). \
You MUST wait for it to finish or cancel it.",
dir,
claim.task_id,
claim.tool_name,
sim * 100.0
))
} else {
Some(format!(
"BLOCKED: Another CLI agent is already working in {} \
(task_id={}, agent={}, prompt=\"{}\"). \
You MUST wait for it to finish or cancel it before dispatching \
another task to the same directory.",
dir, claim.task_id, claim.tool_name, claim.prompt_summary
))
}
} else {
claims.insert(
dir.clone(),
WorkingDirClaim {
task_id: task_id.clone(),
tool_name: tool_name.to_string(),
prompt_summary: short_summary.clone(),
dedup_prompt: dedup_prompt.clone(),
},
);
None
}
};
if let Some(message) = blocked_message {
if let Some(task_id) = delegated_task_id {
Self::persist_delegated_cli_result_with_state(
self.state.clone(),
task_id,
Some(&message),
None,
)
.await;
}
return Ok(message);
}
claim_guard = Some(WorkingDirClaimGuard::new(
self.working_dir_claims.clone(),
dir.clone(),
task_id.clone(),
));
}
let final_prompt = if let Some(instruction) = system_instruction {
self.build_enriched_prompt(
session_id,
instruction,
prompt,
canonical_working_dir.as_deref(),
)
.await
} else {
prompt.to_string()
};
let slot_permit = match self.concurrency_limiter.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
let message = format!(
"Maximum {} CLI agents already running. Use action='list' to see running tasks, or action='cancel' to stop one.",
self.max_concurrent
);
if let Some(task_id) = delegated_task_id {
Self::persist_delegated_cli_result_with_state(
self.state.clone(),
task_id,
None,
Some(&message),
)
.await;
}
return Ok(message);
}
};
if let Some(task_id) = delegated_task_id {
self.persist_delegated_cli_handoff(
task_id,
tool_name,
&final_prompt,
canonical_working_dir.as_deref(),
)
.await;
}
info!(
tool = tool_name,
session = session_id,
prompt_len = final_prompt.len(),
working_dir = canonical_working_dir.as_deref().unwrap_or("(default)"),
async_mode,
"CLI agent invocation — runs with auto-approve flags"
);
let prompt_summary: String = prompt.chars().take(100).collect();
let invocation_id = self
.state
.log_cli_agent_start(
session_id,
tool_name,
&prompt_summary,
canonical_working_dir.as_deref(),
)
.await
.unwrap_or(0);
let state_for_completion = self.state.clone();
let delegated_task_id_owned = delegated_task_id.map(|task_id| task_id.to_string());
let mut cmd = tokio::process::Command::new(&command);
for arg in &args {
cmd.arg(arg);
}
cmd.arg(&final_prompt);
if tool_name.eq_ignore_ascii_case("claude") {
cmd.env_remove("CLAUDECODE");
}
if let Some(ref dir) = canonical_working_dir {
cmd.current_dir(dir);
}
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
configure_command_for_process_group(&mut cmd);
info!(
task_id,
tool = %tool_name,
command = %command,
working_dir = ?canonical_working_dir,
"Starting CLI agent"
);
if let Some(ref tx) = status_tx {
let _ = tx.try_send(StatusUpdate::ToolCancellable {
name: tool_name.to_string(),
task_id: task_id.clone(),
});
}
let started_at_instant = Instant::now();
let mut child = match cmd.spawn() {
Ok(child) => child,
Err(e) => {
if invocation_id != 0 {
let duration = started_at_instant.elapsed().as_secs_f64();
let msg = format!("Failed to spawn CLI agent '{}': {}", tool_name, e);
let summary: String = msg.chars().take(200).collect();
let _ = state_for_completion
.log_cli_agent_complete(invocation_id, None, &summary, false, duration)
.await;
}
if let Some(ref delegated_task_id) = delegated_task_id_owned {
Self::persist_delegated_cli_result_with_state(
state_for_completion.clone(),
delegated_task_id,
None,
Some(&format!("Failed to spawn CLI agent '{}': {}", tool_name, e)),
)
.await;
}
return Err(e.into());
}
};
let pid = child.id().unwrap_or(0);
drop(child.stdin.take());
let stdout = match child.stdout.take() {
Some(stdout) => stdout,
None => {
let error = "Failed to capture stdout".to_string();
if let Some(ref delegated_task_id) = delegated_task_id_owned {
Self::persist_delegated_cli_result_with_state(
self.state.clone(),
delegated_task_id,
None,
Some(&error),
)
.await;
}
return Err(anyhow::anyhow!(error));
}
};
let stderr = match child.stderr.take() {
Some(stderr) => stderr,
None => {
let error = "Failed to capture stderr".to_string();
if let Some(ref delegated_task_id) = delegated_task_id_owned {
Self::persist_delegated_cli_result_with_state(
self.state.clone(),
delegated_task_id,
None,
Some(&error),
)
.await;
}
return Err(anyhow::anyhow!(error));
}
};
let stdout_buf = Arc::new(Mutex::new(String::new()));
let display_buf = Arc::new(Mutex::new(String::new()));
let stdout_buf_writer = stdout_buf.clone();
let display_buf_writer = display_buf.clone();
let status_tx_clone = status_tx.clone();
let tool_name_owned = tool_name.to_string();
let task_context = prompt_summary.clone();
let (completion_tx, completion_rx) =
tokio::sync::oneshot::channel::<(Option<i32>, bool, Option<usize>)>();
let should_notify = Arc::new(AtomicBool::new(async_mode));
let pid_for_kill = pid;
let invocation_started_at = started_at_instant;
let max_output_for_log = max_output;
let notify_session_id = session_id.to_string();
let notify_async = async_mode;
let notify_goal_id = goal_id.map(|s| s.to_string()).unwrap_or_default();
let notify_working_dir = canonical_working_dir.clone();
let hub_for_completion = self.get_hub();
let task_id_for_notify = task_id.clone();
let delegated_task_id_for_completion = delegated_task_id_owned.clone();
let should_notify_for_task = Arc::clone(&should_notify);
let slot_permit_for_task = slot_permit;
tokio::spawn(async move {
let _slot_permit = slot_permit_for_task;
let mut stdout_reader = BufReader::new(stdout).lines();
let mut stderr_reader = BufReader::new(stderr).lines();
let mut last_progress = Instant::now();
let started_at = Instant::now();
let mut pending_lines: Vec<String> = Vec::new();
let mut stdout_done = false;
let mut stderr_done = false;
let mut last_parsed_action: Option<String> = None;
let mut loop_detector = LoopDetector::new();
let mut loop_detected = false;
let mut loop_pattern_count: Option<usize> = None;
let mut last_output_time = Instant::now();
let mut last_non_empty_line = String::new();
loop {
if stdout_done && stderr_done {
break;
}
if loop_detected {
info!(
pid = pid_for_kill,
pattern_count = ?loop_pattern_count,
"Infinite loop detected in CLI agent output, killing process"
);
kill_process(pid_for_kill).await;
break;
}
tokio::select! {
line = stdout_reader.next_line(), if !stdout_done => {
match line {
Ok(Some(text)) => {
last_output_time = Instant::now();
if !text.trim().is_empty() {
last_non_empty_line = text.clone();
}
if looks_like_json(&text) {
let mut blocked: Option<(String, &'static str)> = None;
for cmd in extract_terminal_commands_from_json(&text) {
if let Some(reason) = prohibited_cli_agent_command_reason(&cmd) {
blocked = Some((cmd, reason));
break;
}
}
if let Some((cmd, reason)) = blocked {
info!(
pid = pid_for_kill,
cmd = %cmd,
"Prohibited CLI agent command detected; killing process"
);
{
let mut buf = display_buf_writer.lock().await;
if buf.len() < BUFFER_CAP {
buf.push_str(&format!(
"[killed] Prohibited CLI agent command: {}\nReason: {}\n",
cmd,
reason
));
}
}
kill_process(pid_for_kill).await;
break;
}
}
if loop_detector.add_line(&text) && !loop_detected {
loop_detected = true;
loop_pattern_count = loop_detector.get_loop_pattern();
}
{
let mut buf = stdout_buf_writer.lock().await;
if buf.len() < BUFFER_CAP {
buf.push_str(&text);
buf.push('\n');
}
}
{
let mut buf = display_buf_writer.lock().await;
if buf.len() < BUFFER_CAP {
buf.push_str(&text);
buf.push('\n');
}
}
pending_lines.push(text);
}
_ => stdout_done = true,
}
}
line = stderr_reader.next_line(), if !stderr_done => {
match line {
Ok(Some(text)) => {
last_output_time = Instant::now();
if !text.trim().is_empty() {
last_non_empty_line = format!("[stderr] {}", text);
}
if loop_detector.add_line(&text) && !loop_detected {
loop_detected = true;
loop_pattern_count = loop_detector.get_loop_pattern();
}
let mut buf = display_buf_writer.lock().await;
if buf.len() < BUFFER_CAP {
buf.push_str("[stderr] ");
buf.push_str(&text);
buf.push('\n');
}
pending_lines.push(format!("[stderr] {}", text));
}
_ => stderr_done = true,
}
}
_ = tokio::time::sleep(Duration::from_secs(15)), if !last_non_empty_line.is_empty() && last_output_time.elapsed() > Duration::from_secs(14) => {
let line = &last_non_empty_line;
let lower = line.to_lowercase();
let is_question = line.ends_with('?')
|| lower.contains("y/n")
|| lower.contains("yes/no")
|| lower.contains("enter")
|| lower.contains("confirm")
|| lower.contains("choose")
|| lower.contains("select")
|| lower.contains("which");
if is_question {
info!(
question = %line,
task = %task_context,
"CLI agent appears stuck waiting for input — killing (stdin is null)"
);
let mut buf = display_buf_writer.lock().await;
buf.push_str(&format!("[killed] CLI agent appears stuck waiting for input: {}\n", line));
drop(buf);
kill_process(pid_for_kill).await;
break;
}
}
}
if last_progress.elapsed() >= PROGRESS_INTERVAL {
if let Some(ref tx) = status_tx_clone {
let mut progress_items: Vec<String> = Vec::new();
for line in &pending_lines {
if looks_like_json(line) {
if let Some(progress) = extract_progress_from_json(line) {
progress_items.push(progress.clone());
last_parsed_action = Some(progress);
}
} else {
progress_items.push(line.clone());
}
}
let elapsed_secs = started_at.elapsed().as_secs();
let chunk = if !progress_items.is_empty() {
progress_items.dedup();
progress_items.join("\n")
} else if let Some(ref action) = last_parsed_action {
format!("⏳ {} ({}s)", action, elapsed_secs)
} else {
format!("⏳ Working... ({}s)", elapsed_secs)
};
let _ = tx.try_send(StatusUpdate::ToolProgress {
name: tool_name_owned.clone(),
chunk: truncate_with_note(&chunk, 500),
});
}
pending_lines.clear();
last_progress = Instant::now();
}
}
if !pending_lines.is_empty() {
if let Some(ref tx) = status_tx_clone {
let mut progress_items: Vec<String> = Vec::new();
for line in &pending_lines {
if looks_like_json(line) {
if let Some(progress) = extract_progress_from_json(line) {
progress_items.push(progress);
}
} else {
progress_items.push(line.clone());
}
}
if !progress_items.is_empty() {
progress_items.dedup();
let chunk = progress_items.join("\n");
let _ = tx.try_send(StatusUpdate::ToolProgress {
name: tool_name_owned.clone(),
chunk: truncate_with_note(&chunk, 500),
});
}
}
}
let exit_code = if loop_detected {
None
} else {
match child.wait().await {
Ok(status) => status.code(),
Err(_) => None,
}
};
if invocation_id != 0 {
let duration = invocation_started_at.elapsed().as_secs_f64();
let (success, output_summary, structured_response, structured_error) =
if loop_detected {
(
false,
"Killed - infinite loop detected".to_string(),
None,
Some("Killed - infinite loop detected".to_string()),
)
} else {
let stdout_text = stdout_buf_writer.lock().await.clone();
let display_text = display_buf_writer.lock().await.clone();
if exit_code == Some(0) {
let result_text =
extract_meaningful_output(&stdout_text, max_output_for_log);
let summary: String = result_text.chars().take(200).collect();
(true, summary, Some(result_text), None)
} else {
let auth_msg =
CliAgentTool::detect_auth_error(&display_text, &tool_name_owned);
let summary_src = auth_msg.unwrap_or(display_text);
let summary: String = summary_src.chars().take(200).collect();
(false, summary, None, Some(summary_src))
}
};
let _ = state_for_completion
.log_cli_agent_complete(
invocation_id,
exit_code,
&output_summary,
success,
duration,
)
.await;
if let Some(ref delegated_task_id) = delegated_task_id_for_completion {
CliAgentTool::persist_delegated_cli_result_with_state(
state_for_completion.clone(),
delegated_task_id,
structured_response.as_deref(),
structured_error.as_deref(),
)
.await;
}
if notify_async || should_notify_for_task.load(Ordering::Relaxed) {
let duration_secs = duration as u64;
let duration_display = if duration_secs >= 60 {
format!("{}m{}s", duration_secs / 60, duration_secs % 60)
} else {
format!("{}s", duration_secs)
};
let status_word = if success { "completed" } else { "failed" };
let diff_section = if let Some(ref dir) = notify_working_dir {
CliAgentTool::capture_git_diff(dir)
.await
.map(|diff| format!("\n\n{}", diff))
} else {
None
};
let message = format!(
"Background task {} ({}, {})\nTask: {}\nResult: {}{}",
status_word,
tool_name_owned,
duration_display,
task_context,
output_summary,
diff_section.unwrap_or_default(),
);
let mut delivered = false;
if let Some(ref hub) = hub_for_completion {
if let Err(e) = hub.send_text(¬ify_session_id, &message).await {
warn!(
task_id = %task_id_for_notify,
session_id = %notify_session_id,
error = %e,
"cli_agent background completion direct hub delivery failed"
);
} else {
delivered = true;
}
}
if !delivered {
let notification_type = if success { "completed" } else { "failed" };
let entry = crate::traits::NotificationEntry::new(
¬ify_goal_id,
¬ify_session_id,
notification_type,
&message,
);
if let Err(e) = state_for_completion.enqueue_notification(&entry).await {
warn!(
task_id = %task_id_for_notify,
session_id = %notify_session_id,
error = %e,
"cli_agent background completion enqueue failed"
);
}
}
}
}
let _ = completion_tx.send((exit_code, loop_detected, loop_pattern_count));
});
if async_mode {
let working_dir_owned = canonical_working_dir.clone();
let agent = RunningCliAgent {
tool_name: tool_name.to_string(),
prompt_summary: short_summary.clone(),
started_at: started_at_instant,
display_buf,
stdout_buf,
child_id: pid,
session_id: session_id.to_string(),
delegated_task_id: delegated_task_id_owned.clone(),
working_dir: working_dir_owned,
};
self.running.lock().await.insert(task_id.clone(), agent);
if let Some(guard) = claim_guard.as_mut() {
guard.disarm();
}
return Ok(format!(
"CLI agent '{}' started in background (task_id={}). \
Use action=\"check\" with task_id=\"{}\" to see output when done.",
tool_name, task_id, task_id
));
}
let working_dir_owned = canonical_working_dir;
let result = tokio::time::timeout(timeout, completion_rx).await;
match result {
Ok(Ok((exit_code, was_loop_killed, loop_count))) => {
drop(claim_guard.take());
if was_loop_killed {
let display_output = display_buf.lock().await.clone();
let last_lines: String = display_output
.lines()
.rev()
.take(10)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<Vec<_>>()
.join("\n");
if let Some(ref tx) = status_tx {
let (label, summary) = crate::tools::sanitize::user_facing_tool_activity(
tool_name,
"killed - infinite loop detected",
);
let _ = tx.try_send(StatusUpdate::ToolComplete {
name: label,
summary,
});
}
return Ok(format!(
"ERROR: CLI agent '{}' was automatically killed - INFINITE LOOP DETECTED.\n\n\
The same output line repeated {} times in the last 100 lines.\n\
This is a known bug in some CLI agent versions where they get stuck.\n\n\
Last 10 lines before kill:\n{}\n\n\
Do NOT retry with the same agent. Try a different approach or use a different tool.",
tool_name,
loop_count.unwrap_or(0),
last_lines
));
}
let stdout_output = stdout_buf.lock().await.clone();
info!(
tool = %tool_name,
stdout_len = stdout_output.len(),
stdout_preview = %truncate_str(&stdout_output, 200),
"CLI agent stdout captured"
);
let result_text = extract_meaningful_output(&stdout_output, max_output);
info!(
tool = %tool_name,
result_len = result_text.len(),
result_preview = %truncate_str(&result_text, 200),
"CLI agent result extracted"
);
let diff_section = if let Some(ref dir) = working_dir_owned {
Self::capture_git_diff(dir)
.await
.map(|diff| format!("\n\n## File Changes\n```diff\n{}\n```", diff))
} else {
None
};
if let Some(ref tx) = status_tx {
let raw_summary = if exit_code == Some(0) {
"completed successfully".to_string()
} else {
format!("exited with code {:?}", exit_code)
};
let (label, summary) =
crate::tools::sanitize::user_facing_tool_activity(tool_name, &raw_summary);
let _ = tx.try_send(StatusUpdate::ToolComplete {
name: label,
summary,
});
}
if exit_code != Some(0) {
let display_output = display_buf.lock().await.clone();
if let Some(auth_msg) = Self::detect_auth_error(&display_output, tool_name) {
return Ok(auth_msg);
}
let mut error_msg = format!(
"ERROR: CLI agent '{}' failed (exit code {:?}).\n\n## Error Output\n{}",
tool_name,
exit_code,
truncate_with_note(&display_output, max_output)
);
if let Some(diff) = diff_section {
error_msg.push_str(&diff);
}
error_msg.push_str(
"\n\n## Recovery Options\n\
- Try a different CLI agent\n\
- Handle the task directly with your own tools\n\
- Revert partial changes with `git checkout .` if needed",
);
if let Some(task_id) = delegated_task_id {
self.persist_delegated_cli_result(task_id, None, Some(&error_msg))
.await;
}
return Ok(error_msg);
}
let mut final_result = result_text;
if let Some(diff) = diff_section {
final_result.push_str(&diff);
}
if let Some(task_id) = delegated_task_id {
self.persist_delegated_cli_result(task_id, Some(final_result.as_str()), None)
.await;
}
Ok(final_result)
}
Ok(Err(_)) => {
drop(claim_guard.take());
let error_msg =
format!("ERROR: CLI agent '{}' task failed unexpectedly", tool_name);
if let Some(task_id) = delegated_task_id {
self.persist_delegated_cli_result(task_id, None, Some(&error_msg))
.await;
}
Ok(error_msg)
}
Err(_) => {
let elapsed = timeout.as_secs();
let partial_output = {
let buf = display_buf.lock().await;
truncate_with_note(&buf, 1000)
};
should_notify.store(true, Ordering::Relaxed);
let agent = RunningCliAgent {
tool_name: tool_name.to_string(),
prompt_summary: short_summary.clone(),
started_at: started_at_instant,
display_buf,
stdout_buf,
child_id: pid,
session_id: session_id.to_string(),
delegated_task_id: delegated_task_id_owned.clone(),
working_dir: working_dir_owned,
};
self.running.lock().await.insert(task_id.clone(), agent);
if let Some(guard) = claim_guard.as_mut() {
guard.disarm();
}
Ok(format!(
"CLI agent '{}' still running after {}s. Moved to background (task_id={}).\n\
Use action=\"check\" with task_id=\"{}\" to see output, or action=\"cancel\" to stop it.\n\n\
Partial output:\n{}",
tool_name, elapsed, task_id, task_id, partial_output
))
}
}
}
async fn persist_delegated_cli_handoff(
&self,
delegated_task_id: &str,
tool_name: &str,
prompt: &str,
working_dir: Option<&str>,
) {
let expected_targets = working_dir
.and_then(|dir| ToolTargetHint::new(ToolTargetHintKind::ProjectScope, dir))
.into_iter()
.collect::<Vec<_>>();
let handoff = ExecutorHandoff {
task_id: delegated_task_id.to_string(),
mission: format!("cli_agent:{tool_name}"),
task_description: prompt.to_string(),
target_scope: TargetScope {
allowed_targets: expected_targets.clone(),
hard_fail_outside_scope: working_dir.is_some(),
},
expected_targets,
allowed_tools: Some(vec![format!("cli_agent:{tool_name}")]),
};
if let Ok(Some(mut task)) = self.state.get_task(delegated_task_id).await {
task.status = "running".to_string();
if task.started_at.is_none() {
task.started_at = Some(chrono::Utc::now().to_rfc3339());
}
if let Ok(context) = persist_executor_handoff_context(task.context.as_deref(), &handoff)
{
task.context = Some(context);
}
let _ = self.state.update_task(&task).await;
}
}
async fn persist_delegated_cli_result(
&self,
delegated_task_id: &str,
response: Option<&str>,
error: Option<&str>,
) {
Self::persist_delegated_cli_result_with_state(
self.state.clone(),
delegated_task_id,
response,
error,
)
.await;
}
async fn handle_check(&self, task_id: &str) -> anyhow::Result<String> {
let running = self.running.lock().await;
let Some(agent) = running.get(task_id) else {
drop(running);
let mut completed = self.completed.lock().await;
if let Some(done) = completed.remove(task_id) {
return Ok(done.result);
}
return Ok(format!("No running CLI agent with task_id '{}'", task_id));
};
let elapsed = agent.started_at.elapsed().as_secs();
let display_output = agent.display_buf.lock().await.clone();
let is_running = is_process_alive(agent.child_id);
if !is_running {
Ok(Self::build_finished_result(agent).await)
} else {
Ok(format!(
"CLI agent '{}' still running ({}s elapsed, pid={}).\n\
Task: {}...\n\n\
Partial output ({} chars):\n{}",
agent.tool_name,
elapsed,
agent.child_id,
agent.prompt_summary,
display_output.len(),
truncate_with_note(&display_output, 5000)
))
}
}
async fn handle_cancel(&self, task_id: &str) -> anyhow::Result<String> {
let agent = {
let mut running = self.running.lock().await;
let Some(agent) = running.remove(task_id) else {
return Ok(format!("No running CLI agent with task_id '{}'", task_id));
};
agent
};
self.completed.lock().await.remove(task_id);
let display_output = agent.display_buf.lock().await.clone();
let elapsed = agent.started_at.elapsed().as_secs();
if let Some(ref dir) = agent.working_dir {
self.release_working_dir_claim(dir, task_id);
}
kill_process(agent.child_id).await;
if let Some(ref delegated_task_id) = agent.delegated_task_id {
Self::persist_delegated_cli_result_with_state(
self.state.clone(),
delegated_task_id,
None,
Some("Executor CLI run was cancelled before completion."),
)
.await;
}
Ok(format!(
"Cancelled CLI agent '{}' (was running for {}s).\n\nOutput before cancellation:\n{}",
agent.tool_name,
elapsed,
truncate_with_note(&display_output, 5000)
))
}
async fn handle_cancel_all(&self, session_id: &str) -> anyhow::Result<String> {
let to_cancel: Vec<(String, RunningCliAgent)> = {
let mut running = self.running.lock().await;
let task_ids: Vec<String> = running
.iter()
.filter(|(_, agent)| agent.session_id == session_id)
.map(|(task_id, _)| task_id.clone())
.collect();
let mut removed = Vec::new();
for task_id in task_ids {
if let Some(agent) = running.remove(&task_id) {
removed.push((task_id, agent));
}
}
removed
};
if to_cancel.is_empty() {
return Ok("No running CLI agents for this session.".to_string());
}
let mut cancelled = Vec::new();
for (task_id, agent) in to_cancel {
self.completed.lock().await.remove(&task_id);
if let Some(ref dir) = agent.working_dir {
self.release_working_dir_claim(dir, &task_id);
}
kill_process(agent.child_id).await;
if let Some(ref delegated_task_id) = agent.delegated_task_id {
Self::persist_delegated_cli_result_with_state(
self.state.clone(),
delegated_task_id,
None,
Some("Executor CLI run was cancelled before completion."),
)
.await;
}
cancelled.push(format!("{} ({})", agent.tool_name, task_id));
}
Ok(format!(
"Cancelled {} CLI agent(s): {}",
cancelled.len(),
cancelled.join(", ")
))
}
async fn handle_list(&self) -> anyhow::Result<String> {
let running = self.running.lock().await;
if running.is_empty() {
return Ok("No CLI agents currently running.".to_string());
}
let mut lines = vec!["Running CLI agents:".to_string()];
for (task_id, agent) in running.iter() {
let elapsed = agent.started_at.elapsed().as_secs();
let status = if is_process_alive(agent.child_id) {
"running"
} else {
"finished"
};
lines.push(format!(
" {} - {} ({}, {}s): {}...",
task_id, agent.tool_name, status, elapsed, agent.prompt_summary
));
}
Ok(lines.join("\n"))
}
}
#[derive(Deserialize)]
struct CliAgentArgs {
#[serde(default)]
action: Option<String>,
tool: Option<String>,
prompt: Option<String>,
mission: Option<String>,
task: Option<String>,
command: Option<String>,
description: Option<String>,
working_dir: Option<String>,
task_id: Option<String>,
system_instruction: Option<String>,
#[serde(default)]
async_mode: Option<bool>,
#[serde(default)]
_session_id: Option<String>,
#[serde(default)]
_goal_id: Option<String>,
#[serde(default)]
_task_id: Option<String>,
#[serde(default)]
_user_role: Option<String>,
}
fn non_empty_prompt_field(value: Option<&str>) -> Option<String> {
let trimmed = value?.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn compose_delegated_prompt(mission: Option<String>, task: Option<String>) -> Option<String> {
if mission.is_none() && task.is_none() {
return None;
}
let mut parts = Vec::new();
if let Some(mission) = mission {
parts.push(format!("Mission: {}", mission));
}
if let Some(task) = task {
parts.push(format!("Task: {}", task));
}
Some(parts.join("\n"))
}
impl CliAgentArgs {
fn run_prompt(&self) -> Option<String> {
if let Some(prompt) = non_empty_prompt_field(self.prompt.as_deref()) {
return Some(prompt);
}
if let Some(prompt) = compose_delegated_prompt(
non_empty_prompt_field(self.mission.as_deref()),
non_empty_prompt_field(self.task.as_deref()),
) {
return Some(prompt);
}
let description = non_empty_prompt_field(self.description.as_deref());
let command = non_empty_prompt_field(self.command.as_deref());
if description.is_some() || command.is_some() {
let mut parts = Vec::new();
if let Some(description) = description {
parts.push(description);
}
if let Some(command) = command {
parts.push(format!(
"Run this exact shell command in the working directory and report the result:\n{}",
command
));
}
return Some(parts.join("\n\n"));
}
None
}
async fn contextual_run_prompt(&self, state: &Arc<dyn StateStore>) -> Option<String> {
let delegated_task = match self._task_id.as_deref() {
Some(task_id) if !task_id.trim().is_empty() => {
state.get_task(task_id).await.ok().flatten()
}
_ => None,
};
let delegated_goal_id = self
._goal_id
.clone()
.or_else(|| delegated_task.as_ref().map(|task| task.goal_id.clone()));
let delegated_goal = match delegated_goal_id.as_deref() {
Some(goal_id) if !goal_id.trim().is_empty() => {
state.get_goal(goal_id).await.ok().flatten()
}
_ => None,
};
compose_delegated_prompt(
delegated_goal
.as_ref()
.and_then(|goal| non_empty_prompt_field(Some(goal.description.as_str()))),
delegated_task
.as_ref()
.and_then(|task| non_empty_prompt_field(Some(task.description.as_str()))),
)
}
}
fn looks_like_json(s: &str) -> bool {
let trimmed = s.trim();
trimmed.starts_with('{') || trimmed.starts_with('[')
}
fn extract_progress_from_json(line: &str) -> Option<String> {
let v: Value = serde_json::from_str(line.trim()).ok()?;
if let Some(tool_name) = v.get("name").and_then(|n| n.as_str()) {
if let Some(input) = v.get("input") {
if tool_name == "Read" || tool_name == "read" {
if let Some(path) = input.get("file_path").and_then(|p| p.as_str()) {
let short_path: String = path
.chars()
.rev()
.take(50)
.collect::<String>()
.chars()
.rev()
.collect();
return Some(format!("📖 Reading: ...{}", short_path));
}
} else if tool_name == "Write"
|| tool_name == "write"
|| tool_name == "Edit"
|| tool_name == "edit"
{
if let Some(path) = input.get("file_path").and_then(|p| p.as_str()) {
let short_path: String = path
.chars()
.rev()
.take(50)
.collect::<String>()
.chars()
.rev()
.collect();
return Some(format!("✏️ Writing: ...{}", short_path));
}
} else if tool_name == "Bash" || tool_name == "bash" || tool_name == "terminal" {
if let Some(cmd) = input.get("command").and_then(|c| c.as_str()) {
let short_cmd: String = cmd.chars().take(60).collect();
return Some(format!("⚡ Running: {}", short_cmd));
}
} else if tool_name == "Glob" || tool_name == "glob" {
if let Some(pattern) = input.get("pattern").and_then(|p| p.as_str()) {
return Some(format!("🔍 Searching: {}", pattern));
}
} else if tool_name == "Grep" || tool_name == "grep" {
if let Some(pattern) = input.get("pattern").and_then(|p| p.as_str()) {
let short: String = pattern.chars().take(40).collect();
return Some(format!("🔍 Grep: {}", short));
}
} else {
return Some(format!("🔧 Using: {}", tool_name));
}
}
return Some(format!("🔧 Using: {}", tool_name));
}
if let Some(event_type) = v.get("type").and_then(|t| t.as_str()) {
match event_type {
"assistant" => {
if let Some(content) = v.get("message").and_then(|m| m.get("content")) {
if let Some(arr) = content.as_array() {
for item in arr {
if item.get("type").and_then(|t| t.as_str()) == Some("tool_use") {
let name = item
.get("name")
.and_then(|n| n.as_str())
.unwrap_or("unknown");
let input = item.get("input");
let detail = match name {
"Bash" | "bash" | "terminal" => input
.and_then(|i| i.get("command"))
.and_then(|c| c.as_str())
.map(|cmd| {
let short: String = cmd.chars().take(50).collect();
format!("⚡ {}", short)
}),
"Read" | "read" => input
.and_then(|i| i.get("file_path"))
.and_then(|p| p.as_str())
.map(|path| {
let short: String = path
.chars()
.rev()
.take(40)
.collect::<String>()
.chars()
.rev()
.collect();
format!("📖 ...{}", short)
}),
"Write" | "write" | "Edit" | "edit" => input
.and_then(|i| i.get("file_path"))
.and_then(|p| p.as_str())
.map(|path| {
let short: String = path
.chars()
.rev()
.take(40)
.collect::<String>()
.chars()
.rev()
.collect();
format!("✏️ ...{}", short)
}),
"Glob" | "glob" => input
.and_then(|i| i.get("pattern"))
.and_then(|p| p.as_str())
.map(|pat| format!("🔍 {}", pat)),
"Grep" | "grep" => input
.and_then(|i| i.get("pattern"))
.and_then(|p| p.as_str())
.map(|pat| {
let short: String = pat.chars().take(30).collect();
format!("🔍 grep: {}", short)
}),
"Task" => input
.and_then(|i| i.get("description"))
.and_then(|d| d.as_str())
.map(|desc| format!("🚀 {}", desc)),
_ => None,
};
return Some(detail.unwrap_or_else(|| format!("🔧 {}", name)));
}
}
}
}
return None; }
"tool_use" => {
if let Some(name) = v.get("tool").and_then(|n| n.as_str()) {
return Some(format!("🔧 Using: {}", name));
}
}
"thinking" => {
return Some("💭 Thinking...".to_string());
}
_ => {}
}
}
None
}
fn prohibited_cli_agent_command_reason(command: &str) -> Option<&'static str> {
let lower = command.trim().to_ascii_lowercase();
if lower.contains("aidaemon.db")
|| lower.contains("sqlite3")
|| lower.contains("sqlcipher")
|| lower.contains("pragma key")
|| lower.contains("pragma cipher")
{
return Some("Direct manipulation of aidaemon's state database is prohibited. Use manage_memories / built-in tools instead of terminal SQL.");
}
if lower.contains("install") && lower.contains("sqlcipher") {
return Some(
"Installing sqlcipher is prohibited in cli_agent runs. Use aidaemon tools instead.",
);
}
None
}
fn extract_terminal_commands_from_json(line: &str) -> Vec<String> {
let Ok(v) = serde_json::from_str::<Value>(line.trim()) else {
return vec![];
};
let mut out = Vec::new();
if let Some(tool_name) = v.get("name").and_then(|n| n.as_str()) {
if matches!(tool_name, "Bash" | "bash" | "terminal") {
if let Some(cmd) = v
.get("input")
.and_then(|i| i.get("command"))
.and_then(|c| c.as_str())
{
out.push(cmd.to_string());
}
}
}
if v.get("type").and_then(|t| t.as_str()) == Some("assistant") {
if let Some(items) = v
.get("message")
.and_then(|m| m.get("content"))
.and_then(|c| c.as_array())
{
for item in items {
if item.get("type").and_then(|t| t.as_str()) != Some("tool_use") {
continue;
}
let name = item.get("name").and_then(|n| n.as_str()).unwrap_or("");
if !matches!(name, "Bash" | "bash" | "terminal") {
continue;
}
if let Some(cmd) = item
.get("input")
.and_then(|i| i.get("command"))
.and_then(|c| c.as_str())
{
out.push(cmd.to_string());
}
}
}
}
if v.get("type").and_then(|t| t.as_str()) == Some("tool_use") {
let tool = v
.get("tool")
.and_then(|t| t.as_str())
.unwrap_or("")
.to_ascii_lowercase();
if tool == "bash" || tool == "terminal" {
if let Some(cmd) = v.get("command").and_then(|c| c.as_str()) {
out.push(cmd.to_string());
}
}
}
out
}
fn extract_meaningful_output(raw: &str, max_chars: usize) -> String {
if let Some(content) = extract_json_content(raw) {
return truncate_with_note(&content, max_chars);
}
if let Some(content) = extract_jsonl_content(raw) {
return truncate_with_note(&content, max_chars);
}
truncate_with_note(raw, max_chars)
}
fn extract_json_content(raw: &str) -> Option<String> {
let v: Value = serde_json::from_str(raw).ok()?;
if let Some(result) = v.get("result").and_then(|r| r.as_str()) {
return Some(result.to_string());
}
if let Some(output) = v.get("output").and_then(|o| o.as_str()) {
return Some(output.to_string());
}
if let Some(content) = v.get("content").and_then(|c| c.as_str()) {
return Some(content.to_string());
}
if let Some(message) = v.get("message").and_then(|m| m.as_str()) {
return Some(message.to_string());
}
None
}
fn extract_jsonl_content(raw: &str) -> Option<String> {
let mut last_content: Option<String> = None;
for line in raw.lines().rev() {
if let Ok(v) = serde_json::from_str::<Value>(line) {
if let Some(content) = v
.pointer("/item/content")
.or_else(|| v.pointer("/content"))
.or_else(|| v.pointer("/result"))
{
if let Some(text) = content.as_str() {
last_content = Some(text.to_string());
break;
}
if let Some(arr) = content.as_array() {
let texts: Vec<&str> = arr
.iter()
.filter_map(|item| item.get("text").and_then(|t| t.as_str()))
.collect();
if !texts.is_empty() {
last_content = Some(texts.join("\n"));
break;
}
}
}
}
}
last_content
}
#[async_trait]
impl Tool for CliAgentTool {
fn name(&self) -> &str {
"cli_agent"
}
fn description(&self) -> &str {
"Delegate a task to a CLI-based AI coding agent running on this machine"
}
fn schema(&self) -> Value {
let tool_names = self.tool_names.read().unwrap();
let tools_help = tool_names.join(", ");
let names_vec: Vec<Value> = tool_names.iter().map(|n| json!(n)).collect();
json!({
"name": "cli_agent",
"description": format!(
"Delegate complex multi-step coding/research/analysis work to an installed CLI agent. \
Available agents: {}. If `tool` is omitted, the runtime auto-selects the first installed \
agent (claude, gemini, codex, copilot, aider). Use manage_memories for scheduling. \
Long runs can be checked or cancelled.",
tools_help
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["run", "check", "cancel", "list"],
"description": "run, check, cancel, or list"
},
"tool": {
"type": "string",
"enum": names_vec,
"description": "Agent name"
},
"prompt": {
"type": "string",
"description": "Task prompt"
},
"working_dir": {
"type": "string",
"description": "Absolute working directory. Always set this so the runtime can detect conflicts; two agents must not run concurrently in the same working_dir."
},
"task_id": {
"type": "string",
"description": "Task ID"
},
"system_instruction": {
"type": "string",
"description": "Optional system instruction shaping the agent into a specialist (e.g. 'You are a security auditor')"
},
"async_mode": {
"type": "boolean",
"description": "Run in background; use true to dispatch independent sub-tasks in parallel"
}
},
"required": ["action"],
"additionalProperties": false,
"anyOf": [
{
"required": ["action", "prompt"],
"properties": {
"action": {
"enum": ["run"]
}
}
},
{
"required": ["action", "task_id"],
"properties": {
"action": {
"enum": ["check", "cancel"]
}
}
},
{
"required": ["action"],
"properties": {
"action": {
"enum": ["list"]
}
}
}
]
}
})
}
fn capabilities(&self) -> ToolCapabilities {
ToolCapabilities {
read_only: false,
external_side_effect: true,
needs_approval: true,
idempotent: false,
high_impact_write: true,
}
}
fn is_available(&self) -> bool {
self.has_tools()
}
async fn call(&self, arguments: &str) -> anyhow::Result<String> {
self.call_with_status(arguments, None).await
}
async fn call_with_status(
&self,
arguments: &str,
status_tx: Option<mpsc::Sender<StatusUpdate>>,
) -> anyhow::Result<String> {
let args: CliAgentArgs = serde_json::from_str(arguments)?;
self.reap_finished().await;
let action = args.action.as_deref().unwrap_or("run");
let mut session_id = args._session_id.clone().unwrap_or_default();
if let Some(ref goal_id) = args._goal_id {
if let Ok(Some(goal)) = self.state.get_goal(goal_id).await {
if !goal.session_id.trim().is_empty() {
session_id = goal.session_id;
}
}
}
match action {
"run" => {
let tool = args
.tool
.clone()
.or_else(|| self.default_tool_name())
.ok_or_else(|| anyhow::anyhow!("No CLI agents available for action=run"))?;
let prompt = if let Some(prompt) = args.run_prompt() {
prompt
} else if let Some(prompt) = args.contextual_run_prompt(&self.state).await {
warn!(
task_id = ?args._task_id,
goal_id = ?args._goal_id,
working_dir = ?args.working_dir,
"cli_agent run omitted prompt; synthesized delegated prompt from stored task/goal context"
);
prompt
} else {
warn!(
task_id = ?args._task_id,
goal_id = ?args._goal_id,
has_prompt = args.prompt.as_ref().is_some_and(|s| !s.trim().is_empty()),
has_mission = args.mission.as_ref().is_some_and(|s| !s.trim().is_empty()),
has_task = args.task.as_ref().is_some_and(|s| !s.trim().is_empty()),
has_description = args
.description
.as_ref()
.is_some_and(|s| !s.trim().is_empty()),
has_command = args.command.as_ref().is_some_and(|s| !s.trim().is_empty()),
"cli_agent run missing prompt and had no recoverable delegated context"
);
anyhow::bail!("Missing 'prompt' parameter for action=run");
};
let mut daemon_hits = detect_daemonization_primitives(&prompt);
if let Some(system_instruction) = args.system_instruction.as_deref() {
for hit in detect_daemonization_primitives(system_instruction) {
if !daemon_hits.contains(&hit) {
daemon_hits.push(hit);
}
}
}
if !daemon_hits.is_empty() {
if !Self::is_owner_role(args._user_role.as_deref()) {
return Ok(format!(
"Blocked: daemonization primitives detected in cli_agent prompt ({}) and only owners can approve detached/background execution.",
daemon_hits.join(", ")
));
}
if session_id.trim().is_empty() {
return Ok(
"Blocked: daemonization primitives require owner approval in an interactive session, but no session_id was provided."
.to_string(),
);
}
match self
.request_daemonization_approval(
session_id.trim(),
&tool,
&prompt,
&daemon_hits,
)
.await
{
Ok(ApprovalResponse::Deny) => {
return Ok("Daemonizing cli_agent run denied by owner.".to_string());
}
Ok(
ApprovalResponse::AllowOnce
| ApprovalResponse::AllowSession
| ApprovalResponse::AllowAlways,
) => {}
Err(e) => {
return Ok(format!(
"Could not get owner approval for daemonizing cli_agent run: {}",
e
));
}
}
}
let async_mode = if args._goal_id.is_some() {
false
} else {
args.async_mode.unwrap_or(false)
};
self.handle_run(
&tool,
&prompt,
args.working_dir.as_deref(),
&session_id,
args._goal_id.as_deref(),
args._task_id.as_deref(),
args.system_instruction.as_deref(),
async_mode,
status_tx,
)
.await
}
"check" => {
let task_id = args.task_id.as_ref().ok_or_else(|| {
anyhow::anyhow!("Missing 'task_id' parameter for action=check")
})?;
self.handle_check(task_id).await
}
"cancel" => {
let task_id = args.task_id.as_ref().ok_or_else(|| {
anyhow::anyhow!("Missing 'task_id' parameter for action=cancel")
})?;
self.handle_cancel(task_id).await
}
"cancel_all" => self.handle_cancel_all(&session_id).await,
"list" => self.handle_list().await,
_ => Ok(format!(
"Unknown action '{}'. Use run, check, cancel, cancel_all, or list.",
action
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::CliAgentsConfig;
use crate::memory::embeddings::EmbeddingService;
use crate::state::SqliteStateStore;
use crate::testing::MockProvider;
use crate::traits::store_prelude::*;
use crate::traits::Tool;
use crate::traits::{Goal, Task};
use std::collections::HashMap;
use std::sync::Arc;
fn extract_task_id_from_background_message(msg: &str) -> String {
let marker = "task_id=";
let start = msg
.find(marker)
.expect("background response should include task_id")
+ marker.len();
msg[start..]
.chars()
.take_while(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_')
.collect()
}
async fn setup_echo_tool() -> (CliAgentTool, tempfile::NamedTempFile) {
let db_file = tempfile::NamedTempFile::new().unwrap();
let embedding_service = Arc::new(EmbeddingService::new().unwrap());
let state = Arc::new(
SqliteStateStore::new(
db_file.path().to_str().unwrap(),
100,
None,
embedding_service,
)
.await
.unwrap(),
);
let provider = Arc::new(MockProvider::new());
let (approval_tx_raw, _approval_rx) = tokio::sync::mpsc::channel::<ApprovalRequest>(1);
let approval_tx = ApprovalBroker::new(approval_tx_raw);
let mut tools_map = HashMap::new();
tools_map.insert(
"echo".to_string(),
CliToolEntry {
command: "echo".to_string(),
args: vec![],
description: "Echo agent for testing".to_string(),
timeout: Duration::from_secs(10),
max_output_chars: 10000,
is_dynamic: false,
},
);
let tool = CliAgentTool {
tools: Arc::new(std::sync::RwLock::new(tools_map)),
tool_names: Arc::new(std::sync::RwLock::new(vec!["echo".to_string()])),
running: Arc::new(Mutex::new(HashMap::new())),
completed: Arc::new(Mutex::new(HashMap::new())),
working_dir_claims: Arc::new(std::sync::Mutex::new(HashMap::new())),
state: state as Arc<dyn StateStore>,
llm_runtime: SharedLlmRuntime::new(
provider as Arc<dyn crate::traits::ModelProvider>,
None,
crate::config::ProviderKind::OpenaiCompatible,
"mock".to_string(),
),
default_timeout: Duration::from_secs(10),
default_max_output: 10000,
max_concurrent: 3,
concurrency_limiter: Arc::new(Semaphore::new(3)),
approval_tx,
hub: OnceLock::new(),
};
(tool, db_file)
}
async fn setup_bash_tool() -> (CliAgentTool, tempfile::NamedTempFile) {
let db_file = tempfile::NamedTempFile::new().unwrap();
let embedding_service = Arc::new(EmbeddingService::new().unwrap());
let state = Arc::new(
SqliteStateStore::new(
db_file.path().to_str().unwrap(),
100,
None,
embedding_service,
)
.await
.unwrap(),
);
let provider = Arc::new(MockProvider::new());
let (approval_tx_raw, _approval_rx) = tokio::sync::mpsc::channel::<ApprovalRequest>(1);
let approval_tx = ApprovalBroker::new(approval_tx_raw);
let mut tools_map = HashMap::new();
tools_map.insert(
"bash-agent".to_string(),
CliToolEntry {
command: "bash".to_string(),
args: vec!["-c".to_string()],
description: "Bash agent for testing".to_string(),
timeout: Duration::from_secs(10),
max_output_chars: 10000,
is_dynamic: false,
},
);
let tool = CliAgentTool {
tools: Arc::new(std::sync::RwLock::new(tools_map)),
tool_names: Arc::new(std::sync::RwLock::new(vec!["bash-agent".to_string()])),
running: Arc::new(Mutex::new(HashMap::new())),
completed: Arc::new(Mutex::new(HashMap::new())),
working_dir_claims: Arc::new(std::sync::Mutex::new(HashMap::new())),
state: state as Arc<dyn StateStore>,
llm_runtime: SharedLlmRuntime::new(
provider as Arc<dyn crate::traits::ModelProvider>,
None,
crate::config::ProviderKind::OpenaiCompatible,
"mock".to_string(),
),
default_timeout: Duration::from_secs(10),
default_max_output: 10000,
max_concurrent: 3,
concurrency_limiter: Arc::new(Semaphore::new(3)),
approval_tx,
hub: OnceLock::new(),
};
(tool, db_file)
}
#[tokio::test]
async fn test_run_echo_returns_output() {
let (tool, _db) = setup_echo_tool().await;
let result = tool
.call(r#"{"action":"run","tool":"echo","prompt":"hello world"}"#)
.await
.unwrap();
assert!(
result.contains("hello world"),
"Expected 'hello world' in output, got: {}",
result
);
}
#[tokio::test]
async fn test_run_bash_script_returns_output() {
let (tool, _db) = setup_bash_tool().await;
let result = tool
.call(r#"{"action":"run","tool":"bash-agent","prompt":"echo 'test output 42'"}"#)
.await
.unwrap();
assert!(
result.contains("test output 42"),
"Expected 'test output 42' in output, got: {}",
result
);
}
#[tokio::test]
async fn test_run_daemonization_prompt_blocked_for_non_owner() {
let (tool, _db) = setup_echo_tool().await;
let result = tool
.call(
r#"{"action":"run","tool":"echo","prompt":"nohup echo hi &","_session_id":"sess1","_user_role":"Guest"}"#,
)
.await
.unwrap();
assert!(
result.contains("Blocked: daemonization primitives"),
"Expected daemonization guard block, got: {}",
result
);
}
#[tokio::test]
async fn test_run_captures_exit_code_failure() {
let (tool, _db) = setup_bash_tool().await;
let result = tool
.call(r#"{"action":"run","tool":"bash-agent","prompt":"echo 'failing' >&2; exit 1"}"#)
.await
.unwrap();
assert!(
result.contains("ERROR"),
"Expected ERROR in output for exit code 1, got: {}",
result
);
assert!(
result.contains("failing"),
"Expected stderr in error output, got: {}",
result
);
}
#[tokio::test]
async fn test_run_unknown_tool() {
let (tool, _db) = setup_echo_tool().await;
let result = tool
.call(r#"{"action":"run","tool":"nonexistent","prompt":"test"}"#)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_run_missing_tool_param() {
let (tool, _db) = setup_echo_tool().await;
let result = tool
.call(r#"{"action":"run","prompt":"test"}"#)
.await
.unwrap();
assert!(
result.contains("test"),
"Expected default tool fallback output, got: {}",
result
);
}
#[tokio::test]
async fn test_run_missing_prompt_param() {
let (tool, _db) = setup_echo_tool().await;
let result = tool.call(r#"{"action":"run","tool":"echo"}"#).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_process_completes_without_hanging_on_stdin() {
let (tool, _db) = setup_bash_tool().await;
let start = Instant::now();
let result = tool
.call(r#"{"action":"run","tool":"bash-agent","prompt":"echo 'done'; exit 0"}"#)
.await
.unwrap();
let elapsed = start.elapsed();
assert!(
result.contains("done"),
"Expected 'done' in output, got: {}",
result
);
assert!(
elapsed < Duration::from_secs(5),
"Process took {:?} — likely hanging on stdin",
elapsed
);
}
#[tokio::test]
async fn test_cat_stdin_completes_quickly() {
let (tool, _db) = setup_bash_tool().await;
let start = Instant::now();
let result = tool
.call(r#"{"action":"run","tool":"bash-agent","prompt":"cat; echo 'cat done'"}"#)
.await
.unwrap();
let elapsed = start.elapsed();
assert!(
result.contains("cat done"),
"Expected 'cat done' in output, got: {}",
result
);
assert!(
elapsed < Duration::from_secs(5),
"`cat` took {:?} — stdin not null?",
elapsed
);
}
#[tokio::test]
async fn test_concurrent_limit_enforced() {
let (tool, _db) = setup_bash_tool().await;
for _ in 0..3 {
tool.call(
r#"{"action":"run","tool":"bash-agent","prompt":"sleep 30","async_mode":true}"#,
)
.await
.unwrap();
}
let result = tool
.call(r#"{"action":"run","tool":"bash-agent","prompt":"echo should-not-run"}"#)
.await
.unwrap();
assert!(
result.contains("Maximum 3 CLI agents already running"),
"Expected concurrent limit message, got: {}",
result
);
tool.handle_cancel_all("").await.unwrap();
}
#[tokio::test]
async fn test_working_dir_conflict_detection() {
let (tool, _db) = setup_bash_tool().await;
lock_claims(&tool.working_dir_claims).insert(
"/tmp/project".to_string(),
WorkingDirClaim {
task_id: "abc12345".to_string(),
tool_name: "bash-agent".to_string(),
prompt_summary: "sleep 60".to_string(),
dedup_prompt: make_dedup_prompt("sleep 60"),
},
);
let result = tool
.call(
r#"{"action":"run","tool":"bash-agent","prompt":"echo test","working_dir":"/tmp/project"}"#,
)
.await
.unwrap();
assert!(
result.contains("BLOCKED") && result.contains("Another CLI agent"),
"Expected working dir conflict BLOCKED message, got: {}",
result
);
}
#[tokio::test]
async fn test_working_dir_lock_released_after_completion() {
let (tool, _db) = setup_bash_tool().await;
let tmp_dir = tempfile::TempDir::new().unwrap();
let dir_path = tmp_dir.path().to_str().unwrap();
let args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"echo done","working_dir":"{}"}}"#,
dir_path
);
let result = tool.call(&args).await.unwrap();
assert!(result.contains("done"));
let claims = lock_claims(&tool.working_dir_claims);
assert!(
!claims.contains_key(dir_path),
"Working-dir claim not released after completion"
);
}
#[tokio::test]
async fn test_working_dir_aliases_conflict_after_normalization() {
let (tool, _db) = setup_bash_tool().await;
let tmp_dir = tempfile::TempDir::new().unwrap();
let dir = tmp_dir.path().to_string_lossy().to_string();
let dir_with_slash = format!("{}/", dir.trim_end_matches('/'));
let first_args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"sleep 5","working_dir":"{}","async_mode":true}}"#,
dir
);
let first = tool.call(&first_args).await.unwrap();
assert!(
first.contains("started in background"),
"Expected first task to start in background, got: {}",
first
);
let second_args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"echo test","working_dir":"{}"}}"#,
dir_with_slash
);
let second = tool.call(&second_args).await.unwrap();
assert!(
second.contains("BLOCKED") && second.contains("Another CLI agent"),
"Expected normalized path conflict BLOCKED message, got: {}",
second
);
tool.handle_cancel_all("").await.unwrap();
}
#[tokio::test]
async fn test_working_dir_lock_released_on_spawn_failure() {
let (tool, _db) = setup_bash_tool().await;
let missing_dir = "/tmp/aidaemon-cli-agent-missing-dir-lock-test";
let _ = std::fs::remove_dir_all(missing_dir);
let normalized = CliAgentTool::normalize_working_dir(missing_dir);
let args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"echo should-fail","working_dir":"{}"}}"#,
missing_dir
);
let result = tool.call(&args).await;
assert!(
result.is_err(),
"Expected spawn failure for missing current_dir, got: {:?}",
result
);
let claims = lock_claims(&tool.working_dir_claims);
assert!(
!claims.contains_key(&normalized),
"Working-dir claim should be released after spawn failure"
);
}
#[tokio::test]
async fn test_concurrent_limit_includes_sync_runs() {
let (mut tool, _db) = setup_bash_tool().await;
tool.max_concurrent = 1;
tool.concurrency_limiter = Arc::new(Semaphore::new(1));
let tool = Arc::new(tool);
let tool_for_first = Arc::clone(&tool);
let first = tokio::spawn(async move {
tool_for_first
.call(r#"{"action":"run","tool":"bash-agent","prompt":"sleep 2; echo first"}"#)
.await
});
tokio::time::sleep(Duration::from_millis(150)).await;
let second = tool
.call(r#"{"action":"run","tool":"bash-agent","prompt":"echo second"}"#)
.await
.unwrap();
assert!(
second.contains("Maximum 1 CLI agents already running"),
"Expected concurrency rejection while sync run is active, got: {}",
second
);
let first_result = first.await.unwrap().unwrap();
assert!(
first_result.contains("first"),
"Expected first sync run output, got: {}",
first_result
);
}
#[tokio::test]
async fn test_sync_inflight_conflict_reports_claim_metadata() {
let (tool, _db) = setup_bash_tool().await;
let tool = Arc::new(tool);
let tmp_dir = tempfile::TempDir::new().unwrap();
let dir = tmp_dir.path().to_string_lossy().to_string();
let first_args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"sleep 2; echo first","working_dir":"{}"}}"#,
dir
);
let tool_for_first = Arc::clone(&tool);
let first = tokio::spawn(async move { tool_for_first.call(&first_args).await.unwrap() });
tokio::time::sleep(Duration::from_millis(150)).await;
let second_args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"echo second","working_dir":"{}"}}"#,
dir
);
let second = tool.call(&second_args).await.unwrap();
assert!(
second.contains("BLOCKED")
&& second.contains("task_id=")
&& second.contains("agent=bash-agent"),
"Expected conflict response with claim metadata, got: {}",
second
);
let first_result = first.await.unwrap();
assert!(
first_result.contains("first"),
"Expected first sync run output, got: {}",
first_result
);
}
#[tokio::test]
async fn test_sync_inflight_duplicate_prompt_blocked() {
let (tool, _db) = setup_bash_tool().await;
let tool = Arc::new(tool);
let tmp_dir = tempfile::TempDir::new().unwrap();
let dir = tmp_dir.path().to_string_lossy().to_string();
let first_args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"echo refactor the auth module to use OAuth tokens; sleep 2","working_dir":"{}"}}"#,
dir
);
let tool_for_first = Arc::clone(&tool);
let first = tokio::spawn(async move { tool_for_first.call(&first_args).await.unwrap() });
tokio::time::sleep(Duration::from_millis(150)).await;
let second_args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"echo refactor the auth module to use OAuth tokens","working_dir":"{}"}}"#,
dir
);
let second = tool.call(&second_args).await.unwrap();
assert!(
second.contains("BLOCKED") && second.contains("similar task"),
"Expected duplicate-task block for sync in-flight run, got: {}",
second
);
let _ = first.await.unwrap();
}
#[tokio::test]
async fn test_working_dir_claim_released_on_semaphore_reject() {
let (mut tool, _db) = setup_bash_tool().await;
tool.max_concurrent = 0;
tool.concurrency_limiter = Arc::new(Semaphore::new(0));
let tmp_dir = tempfile::TempDir::new().unwrap();
let dir_path = tmp_dir.path().to_str().unwrap();
let normalized = CliAgentTool::normalize_working_dir(dir_path);
let args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"echo blocked","working_dir":"{}"}}"#,
dir_path
);
let result = tool.call(&args).await.unwrap();
assert!(
result.contains("Maximum 0 CLI agents already running"),
"Expected semaphore rejection, got: {}",
result
);
let claims = lock_claims(&tool.working_dir_claims);
assert!(
!claims.contains_key(&normalized),
"Working-dir claim should be released after semaphore rejection"
);
}
#[tokio::test]
async fn test_working_dir_claim_released_when_dispatch_future_dropped() {
let (tool, _db) = setup_bash_tool().await;
let tool = Arc::new(tool);
let tmp_dir = tempfile::TempDir::new().unwrap();
let dir = tmp_dir.path().to_string_lossy().to_string();
let normalized = CliAgentTool::normalize_working_dir(&dir);
let args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"sleep 15","working_dir":"{}"}}"#,
dir
);
let tool_for_task = Arc::clone(&tool);
let handle = tokio::spawn(async move { tool_for_task.call(&args).await });
let mut claimed = false;
for _ in 0..100 {
tokio::time::sleep(Duration::from_millis(50)).await;
if lock_claims(&tool.working_dir_claims).contains_key(&normalized) {
claimed = true;
break;
}
}
assert!(claimed, "Working-dir claim was never registered");
handle.abort();
let _ = handle.await;
assert!(
!lock_claims(&tool.working_dir_claims).contains_key(&normalized),
"Working-dir claim should be released when the dispatch future is dropped"
);
}
#[tokio::test]
async fn test_claim_guard_disarm_keeps_claim() {
let claims: Arc<WorkingDirClaims> = Arc::new(std::sync::Mutex::new(HashMap::new()));
lock_claims(&claims).insert(
"/tmp/project".to_string(),
WorkingDirClaim {
task_id: "abc12345".to_string(),
tool_name: "bash-agent".to_string(),
prompt_summary: "sleep 60".to_string(),
dedup_prompt: make_dedup_prompt("sleep 60"),
},
);
let mut guard = WorkingDirClaimGuard::new(
Arc::clone(&claims),
"/tmp/project".to_string(),
"abc12345".to_string(),
);
guard.disarm();
drop(guard);
assert!(
lock_claims(&claims).contains_key("/tmp/project"),
"Disarmed guard must not release the claim"
);
let other_guard = WorkingDirClaimGuard::new(
Arc::clone(&claims),
"/tmp/project".to_string(),
"other999".to_string(),
);
drop(other_guard);
assert!(
lock_claims(&claims).contains_key("/tmp/project"),
"Guard for a different task must not release someone else's claim"
);
let owner_guard = WorkingDirClaimGuard::new(
Arc::clone(&claims),
"/tmp/project".to_string(),
"abc12345".to_string(),
);
drop(owner_guard);
assert!(
!lock_claims(&claims).contains_key("/tmp/project"),
"Owning guard must release the claim on drop"
);
}
#[tokio::test]
async fn test_async_mode_returns_immediately() {
let (tool, _db) = setup_bash_tool().await;
let start = Instant::now();
let result = tool
.call(
r#"{"action":"run","tool":"bash-agent","prompt":"sleep 2; echo async-done","async_mode":true}"#,
)
.await
.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(1),
"Async mode took {:?} — not returning immediately",
elapsed
);
assert!(
result.contains("started in background"),
"Expected background message, got: {}",
result
);
assert!(
result.contains("task_id="),
"Expected task_id in response, got: {}",
result
);
}
#[tokio::test]
async fn test_async_mode_check_shows_result() {
let (tool, _db) = setup_bash_tool().await;
let result = tool
.call(
r#"{"action":"run","tool":"bash-agent","prompt":"echo async-check-test; sleep 5","async_mode":true}"#,
)
.await
.unwrap();
let task_id = result
.split("task_id=")
.nth(1)
.unwrap()
.split(')')
.next()
.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
let check_args = format!(r#"{{"action":"check","task_id":"{}"}}"#, task_id);
let check_result = tool.call(&check_args).await.unwrap();
assert!(
check_result.contains("async-check-test")
|| check_result.contains("still running")
|| check_result.contains("finished"),
"Expected task output or status, got: {}",
check_result
);
let cancel_args = format!(r#"{{"action":"cancel","task_id":"{}"}}"#, task_id);
tool.call(&cancel_args).await.unwrap();
}
#[tokio::test]
async fn test_cancel_nonexistent_task() {
let (tool, _db) = setup_echo_tool().await;
let result = tool
.call(r#"{"action":"cancel","task_id":"nonexistent"}"#)
.await
.unwrap();
assert!(
result.contains("No running CLI agent"),
"Expected not found message, got: {}",
result
);
}
#[tokio::test]
async fn test_list_empty() {
let (tool, _db) = setup_echo_tool().await;
let result = tool.call(r#"{"action":"list"}"#).await.unwrap();
assert!(
result.contains("No CLI agents currently running"),
"Expected empty list message, got: {}",
result
);
}
#[tokio::test]
async fn test_run_accepts_mission_and_task_aliases() {
let (tool, _db) = setup_echo_tool().await;
let result = tool
.call(
r#"{"action":"run","tool":"echo","mission":"Email helper","task":"Open Gmail and summarize the inbox"}"#,
)
.await
.unwrap();
assert!(
result.contains("Mission: Email helper"),
"Expected synthesized mission in output, got: {}",
result
);
assert!(
result.contains("Task: Open Gmail and summarize the inbox"),
"Expected synthesized task in output, got: {}",
result
);
}
#[tokio::test]
async fn test_run_recovers_prompt_from_task_and_goal_context() {
let (tool, _db) = setup_echo_tool().await;
let tmp_dir = tempfile::TempDir::new().unwrap();
let working_dir = tmp_dir.path().to_string_lossy().to_string();
let goal = Goal::new_finite(
"Review the latest aidaemon service logs and make the smallest safe fix",
"sess-ctx",
);
tool.state.create_goal(&goal).await.unwrap();
let task = Task {
id: uuid::Uuid::new_v4().to_string(),
goal_id: goal.id.clone(),
description: format!(
"Inspect the recent log failures and patch the root cause in {}",
working_dir
),
status: "claimed".to_string(),
priority: "high".to_string(),
task_order: 1,
parallel_group: None,
depends_on: None,
agent_id: Some("task-lead".to_string()),
context: None,
result: None,
error: None,
blocker: None,
idempotent: true,
retry_count: 0,
max_retries: 3,
created_at: chrono::Utc::now().to_rfc3339(),
started_at: None,
completed_at: None,
};
tool.state.create_task(&task).await.unwrap();
let result = tool
.call(&format!(
r#"{{"action":"run","tool":"echo","working_dir":"{}","_goal_id":"{}","_task_id":"{}"}}"#,
working_dir, goal.id, task.id
))
.await
.unwrap();
assert!(
result.contains(
"Mission: Review the latest aidaemon service logs and make the smallest safe fix"
),
"Expected synthesized mission from goal context, got: {}",
result
);
assert!(
result.contains(&format!(
"Task: Inspect the recent log failures and patch the root cause in {}",
working_dir
)),
"Expected synthesized task from task context, got: {}",
result
);
let updated_task = tool.state.get_task(&task.id).await.unwrap().unwrap();
assert_eq!(updated_task.status, "completed");
let context: serde_json::Value =
serde_json::from_str(updated_task.context.as_deref().unwrap()).unwrap();
assert_eq!(
context["executor_result"]["task_outcome"].as_str(),
Some("task_done")
);
let stored_scope = context["executor_handoff"]["target_scope"]["allowed_targets"][0]
["value"]
.as_str()
.unwrap();
assert!(
stored_scope.ends_with(tmp_dir.path().to_string_lossy().as_ref()),
"expected stored scope '{}' to end with '{}'",
stored_scope,
tmp_dir.path().to_string_lossy()
);
}
#[tokio::test]
async fn test_run_accepts_command_and_description_aliases() {
let (tool, _db) = setup_echo_tool().await;
let result = tool
.call(
r#"{"action":"run","tool":"echo","description":"Check test output","command":"pwd"}"#,
)
.await
.unwrap();
assert!(
result.contains("Check test output"),
"Expected synthesized description in output, got: {}",
result
);
assert!(
result.contains(
"Run this exact shell command in the working directory and report the result:\npwd"
),
"Expected synthesized command guidance in output, got: {}",
result
);
}
#[tokio::test]
async fn test_schema_includes_registered_tools() {
let (tool, _db) = setup_echo_tool().await;
let schema = tool.schema();
assert_eq!(schema["name"], "cli_agent");
assert!(schema["description"].as_str().unwrap().contains("echo"));
assert!(schema["parameters"]["properties"]["tool"]["enum"]
.as_array()
.unwrap()
.contains(&json!("echo")));
}
#[tokio::test]
async fn test_schema_updates_after_dynamic_add() {
let (tool, _db) = setup_echo_tool().await;
{
let mut tools = tool.tools.write().unwrap();
tools.insert(
"new-tool".to_string(),
CliToolEntry {
command: "echo".to_string(),
args: vec![],
description: "Newly added".to_string(),
timeout: Duration::from_secs(10),
max_output_chars: 10000,
is_dynamic: true,
},
);
let mut names = tool.tool_names.write().unwrap();
names.push("new-tool".to_string());
names.sort();
}
let schema = tool.schema();
let enum_vals = schema["parameters"]["properties"]["tool"]["enum"]
.as_array()
.unwrap();
assert!(
enum_vals.contains(&json!("echo")) && enum_vals.contains(&json!("new-tool")),
"Schema should include both tools: {:?}",
enum_vals
);
}
#[tokio::test]
async fn test_add_agent_with_echo() {
let (tool, _db) = setup_echo_tool().await;
let result = tool
.add_agent("my-echo", "echo", vec![], "Custom echo", None, None)
.await
.unwrap();
assert!(
result.contains("added successfully"),
"Expected success, got: {}",
result
);
let agents = tool.list_agents();
assert!(
agents.iter().any(|(name, _, _, _)| name == "my-echo"),
"Expected my-echo in agent list"
);
}
#[tokio::test]
async fn test_add_agent_nonexistent_command() {
let (tool, _db) = setup_echo_tool().await;
let result = tool
.add_agent(
"fake",
"aidaemon-nonexistent-cmd-xyz",
vec![],
"",
None,
None,
)
.await
.unwrap();
assert!(
result.contains("not found"),
"Expected not found error, got: {}",
result
);
}
#[tokio::test]
async fn test_remove_agent() {
let (tool, _db) = setup_echo_tool().await;
tool.add_agent("removeme", "echo", vec![], "", None, None)
.await
.unwrap();
let result = tool.remove_agent("removeme").await.unwrap();
assert!(result.contains("removed"));
let agents = tool.list_agents();
assert!(
!agents.iter().any(|(name, _, _, _)| name == "removeme"),
"Agent should have been removed"
);
}
#[tokio::test]
async fn test_remove_nonexistent_agent() {
let (tool, _db) = setup_echo_tool().await;
let result = tool.remove_agent("nonexistent").await.unwrap();
assert!(result.contains("not found"));
}
#[tokio::test]
async fn test_detect_auth_error_patterns() {
let cases = vec![
("Error: authentication required", true),
("401 Unauthorized", true),
("Token expired, please re-authenticate", true),
("Login required to continue", true),
("Invalid API key provided", true),
("Access denied: forbidden", true),
("Invalid token for this resource", true),
("Normal output: everything is fine", false),
("Compiling project...", false),
];
for (output, should_detect) in cases {
let result = CliAgentTool::detect_auth_error(output, "test-agent");
if should_detect {
assert!(
result.is_some(),
"Expected auth error detection for: {}",
output
);
assert!(result.unwrap().contains("authentication failed"));
} else {
assert!(
result.is_none(),
"False positive auth detection for: {}",
output
);
}
}
}
#[test]
fn test_loop_detector_no_false_positive() {
let mut detector = LoopDetector::new();
for i in 0..20 {
assert!(!detector.add_line(&format!("unique line {}", i)));
}
}
#[test]
fn test_loop_detector_catches_repetition() {
let mut detector = LoopDetector::new();
for i in 0..LOOP_DETECTION_THRESHOLD + 1 {
let detected = detector.add_line("stuck in a loop");
if i >= LOOP_DETECTION_THRESHOLD - 1 {
if detected {
return; }
}
}
panic!("Loop detector should have triggered");
}
#[test]
fn test_loop_detector_ignores_empty_lines() {
let mut detector = LoopDetector::new();
for _ in 0..200 {
assert!(!detector.add_line(""));
assert!(!detector.add_line(" "));
}
}
#[test]
fn test_extract_meaningful_output_plain_text() {
let output = "Hello, this is plain text output\nLine 2\n";
let result = extract_meaningful_output(output, 10000);
assert_eq!(result, output);
}
#[test]
fn test_extract_meaningful_output_json_result() {
let output = r#"{"result": "The task is complete. Created 3 files."}"#;
let result = extract_meaningful_output(output, 10000);
assert_eq!(result, "The task is complete. Created 3 files.");
}
#[test]
fn test_extract_meaningful_output_json_output_field() {
let output = r#"{"output": "Generated report successfully"}"#;
let result = extract_meaningful_output(output, 10000);
assert_eq!(result, "Generated report successfully");
}
#[test]
fn test_extract_meaningful_output_truncation() {
let output = "a".repeat(20000);
let result = extract_meaningful_output(&output, 100);
assert!(result.len() <= 200); assert!(result.contains("truncated"));
}
#[test]
fn test_extract_progress_from_json_tool_use() {
let json = r#"{"name":"Read","input":{"file_path":"/src/main.rs"}}"#;
let progress = extract_progress_from_json(json);
assert!(progress.is_some());
assert!(progress.unwrap().contains("main.rs"));
}
#[test]
fn test_extract_progress_from_json_bash_command() {
let json = r#"{"name":"Bash","input":{"command":"npm install"}}"#;
let progress = extract_progress_from_json(json);
assert!(progress.is_some());
assert!(progress.unwrap().contains("npm install"));
}
#[test]
fn test_extract_progress_from_json_non_json() {
let text = "This is just regular text";
assert!(extract_progress_from_json(text).is_none());
}
#[test]
fn test_looks_like_json() {
assert!(looks_like_json(r#"{"key": "value"}"#));
assert!(looks_like_json(r#"[1, 2, 3]"#));
assert!(looks_like_json(r#" {"indented": true} "#));
assert!(!looks_like_json("plain text"));
assert!(!looks_like_json(""));
}
#[tokio::test]
async fn test_discover_finds_echo() {
let db_file = tempfile::NamedTempFile::new().unwrap();
let embedding_service = Arc::new(EmbeddingService::new().unwrap());
let state = Arc::new(
SqliteStateStore::new(
db_file.path().to_str().unwrap(),
100,
None,
embedding_service,
)
.await
.unwrap(),
);
let provider = Arc::new(MockProvider::new());
let (approval_tx_raw, _approval_rx) = tokio::sync::mpsc::channel::<ApprovalRequest>(1);
let approval_tx = ApprovalBroker::new(approval_tx_raw);
let mut tools = HashMap::new();
tools.insert(
"echo".to_string(),
crate::config::CliToolConfig {
command: "echo".to_string(),
args: vec![],
description: "Echo for test".to_string(),
timeout_secs: None,
max_output_chars: None,
},
);
let config = CliAgentsConfig {
enabled: true,
timeout_secs: 30,
max_output_chars: 10000,
tools,
};
let tool = CliAgentTool::discover(
config,
state as Arc<dyn StateStore>,
SharedLlmRuntime::new(
provider as Arc<dyn crate::traits::ModelProvider>,
None,
crate::config::ProviderKind::OpenaiCompatible,
"mock".to_string(),
),
approval_tx,
)
.await;
assert!(tool.has_tools());
let agents = tool.list_agents();
assert!(agents.iter().any(|(name, _, _, _)| name == "echo"));
}
#[tokio::test]
async fn test_discover_skips_nonexistent() {
let db_file = tempfile::NamedTempFile::new().unwrap();
let embedding_service = Arc::new(EmbeddingService::new().unwrap());
let state = Arc::new(
SqliteStateStore::new(
db_file.path().to_str().unwrap(),
100,
None,
embedding_service,
)
.await
.unwrap(),
);
let provider = Arc::new(MockProvider::new());
let (approval_tx_raw, _approval_rx) = tokio::sync::mpsc::channel::<ApprovalRequest>(1);
let approval_tx = ApprovalBroker::new(approval_tx_raw);
let mut tools = HashMap::new();
tools.insert(
"fake-tool".to_string(),
crate::config::CliToolConfig {
command: "aidaemon-nonexistent-12345".to_string(),
args: vec![],
description: "".to_string(),
timeout_secs: None,
max_output_chars: None,
},
);
let config = CliAgentsConfig {
enabled: true,
timeout_secs: 30,
max_output_chars: 10000,
tools,
};
let tool = CliAgentTool::discover(
config,
state as Arc<dyn StateStore>,
SharedLlmRuntime::new(
provider as Arc<dyn crate::traits::ModelProvider>,
None,
crate::config::ProviderKind::OpenaiCompatible,
"mock".to_string(),
),
approval_tx,
)
.await;
assert!(!tool.has_tools());
}
#[tokio::test]
async fn test_run_logs_invocation() {
let db_file = tempfile::NamedTempFile::new().unwrap();
let embedding_service = Arc::new(EmbeddingService::new().unwrap());
let state = Arc::new(
SqliteStateStore::new(
db_file.path().to_str().unwrap(),
100,
None,
embedding_service,
)
.await
.unwrap(),
);
let state_clone = state.clone();
let provider = Arc::new(MockProvider::new());
let (approval_tx_raw, _approval_rx) = tokio::sync::mpsc::channel::<ApprovalRequest>(1);
let approval_tx = ApprovalBroker::new(approval_tx_raw);
let mut tools_map = HashMap::new();
tools_map.insert(
"echo".to_string(),
CliToolEntry {
command: "echo".to_string(),
args: vec![],
description: "".to_string(),
timeout: Duration::from_secs(10),
max_output_chars: 10000,
is_dynamic: false,
},
);
let tool = CliAgentTool {
tools: Arc::new(std::sync::RwLock::new(tools_map)),
tool_names: Arc::new(std::sync::RwLock::new(vec!["echo".to_string()])),
running: Arc::new(Mutex::new(HashMap::new())),
completed: Arc::new(Mutex::new(HashMap::new())),
working_dir_claims: Arc::new(std::sync::Mutex::new(HashMap::new())),
state: state as Arc<dyn StateStore>,
llm_runtime: SharedLlmRuntime::new(
provider as Arc<dyn crate::traits::ModelProvider>,
None,
crate::config::ProviderKind::OpenaiCompatible,
"mock".to_string(),
),
default_timeout: Duration::from_secs(10),
default_max_output: 10000,
max_concurrent: 3,
concurrency_limiter: Arc::new(Semaphore::new(3)),
approval_tx,
hub: OnceLock::new(),
};
tool.call(r#"{"action":"run","tool":"echo","prompt":"log test","_session_id":"sess1"}"#)
.await
.unwrap();
let invocations = state_clone.get_cli_agent_invocations(10).await.unwrap();
assert!(
!invocations.is_empty(),
"Expected at least one invocation logged"
);
assert_eq!(invocations[0].agent_name, "echo");
assert!(invocations[0].prompt_summary.contains("log test"));
assert_eq!(invocations[0].success, Some(true));
assert!(invocations[0].duration_secs.is_some());
}
#[tokio::test]
async fn test_timeout_run_still_logs_completion() {
let db_file = tempfile::NamedTempFile::new().unwrap();
let embedding_service = Arc::new(EmbeddingService::new().unwrap());
let state = Arc::new(
SqliteStateStore::new(
db_file.path().to_str().unwrap(),
100,
None,
embedding_service,
)
.await
.unwrap(),
);
let state_clone = state.clone();
let provider = Arc::new(MockProvider::new());
let (approval_tx_raw, _approval_rx) = tokio::sync::mpsc::channel::<ApprovalRequest>(1);
let approval_tx = ApprovalBroker::new(approval_tx_raw);
let mut tools_map = HashMap::new();
tools_map.insert(
"bash".to_string(),
CliToolEntry {
command: "bash".to_string(),
args: vec!["-c".to_string()],
description: "".to_string(),
timeout: Duration::from_millis(50),
max_output_chars: 10000,
is_dynamic: false,
},
);
let tool = CliAgentTool {
tools: Arc::new(std::sync::RwLock::new(tools_map)),
tool_names: Arc::new(std::sync::RwLock::new(vec!["bash".to_string()])),
running: Arc::new(Mutex::new(HashMap::new())),
completed: Arc::new(Mutex::new(HashMap::new())),
working_dir_claims: Arc::new(std::sync::Mutex::new(HashMap::new())),
state: state as Arc<dyn StateStore>,
llm_runtime: SharedLlmRuntime::new(
provider as Arc<dyn crate::traits::ModelProvider>,
None,
crate::config::ProviderKind::OpenaiCompatible,
"mock".to_string(),
),
default_timeout: Duration::from_secs(10),
default_max_output: 10000,
max_concurrent: 3,
concurrency_limiter: Arc::new(Semaphore::new(3)),
approval_tx,
hub: OnceLock::new(),
};
let resp = tool
.call(
r#"{"action":"run","tool":"bash","prompt":"sleep 0.2; echo done","_session_id":"sess1"}"#,
)
.await
.unwrap();
assert!(
resp.contains("Moved to background") || resp.contains("still running"),
"expected background/timeout response, got: {}",
resp
);
tokio::time::sleep(Duration::from_millis(500)).await;
let invocations = state_clone.get_cli_agent_invocations(10).await.unwrap();
assert!(!invocations.is_empty(), "Expected invocation logged");
assert_eq!(invocations[0].agent_name, "bash");
assert_eq!(
invocations[0].success,
Some(true),
"Expected background invocation to be marked successful"
);
assert!(
invocations[0].completed_at.is_some(),
"Expected completed_at to be set"
);
assert!(
invocations[0].duration_secs.unwrap_or(0.0) > 0.0,
"Expected a positive duration"
);
}
#[tokio::test]
async fn test_background_delegated_run_persists_structured_task_result() {
let (tool, _db) = setup_bash_tool().await;
{
let mut tools = tool.tools.write().unwrap();
tools.get_mut("bash-agent").unwrap().timeout = Duration::from_millis(50);
}
let goal = Goal::new_finite("Patch the current repo safely", "sess-bg");
tool.state.create_goal(&goal).await.unwrap();
let task = Task {
id: uuid::Uuid::new_v4().to_string(),
goal_id: goal.id.clone(),
description: "Run a delegated CLI task in the background".to_string(),
status: "claimed".to_string(),
priority: "high".to_string(),
task_order: 1,
parallel_group: None,
depends_on: None,
agent_id: Some("task-lead".to_string()),
context: None,
result: None,
error: None,
blocker: None,
idempotent: true,
retry_count: 0,
max_retries: 3,
created_at: chrono::Utc::now().to_rfc3339(),
started_at: None,
completed_at: None,
};
tool.state.create_task(&task).await.unwrap();
let resp = tool
.call(&format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"sleep 0.2; echo delegated-background-ok","_session_id":"sess-bg","_goal_id":"{}","_task_id":"{}"}}"#,
goal.id, task.id
))
.await
.unwrap();
assert!(
resp.contains("Moved to background") || resp.contains("still running"),
"expected timeout/background handoff, got: {}",
resp
);
tokio::time::sleep(Duration::from_millis(700)).await;
let updated_task = tool.state.get_task(&task.id).await.unwrap().unwrap();
assert_eq!(updated_task.status, "completed");
let context: serde_json::Value =
serde_json::from_str(updated_task.context.as_deref().unwrap()).unwrap();
assert_eq!(
context["executor_result"]["task_outcome"].as_str(),
Some("task_done")
);
assert!(
updated_task
.result
.as_deref()
.unwrap_or("")
.contains("delegated-background-ok"),
"expected delegated background result to be persisted, got: {:?}",
updated_task.result
);
}
#[tokio::test]
async fn test_background_cli_check_returns_result_after_reap() {
let db_file = tempfile::NamedTempFile::new().unwrap();
let embedding_service = Arc::new(EmbeddingService::new().unwrap());
let state = Arc::new(
SqliteStateStore::new(
db_file.path().to_str().unwrap(),
100,
None,
embedding_service,
)
.await
.unwrap(),
);
let provider = Arc::new(MockProvider::new());
let (approval_tx_raw, _approval_rx) = tokio::sync::mpsc::channel::<ApprovalRequest>(1);
let approval_tx = ApprovalBroker::new(approval_tx_raw);
let mut tools_map = HashMap::new();
tools_map.insert(
"bash".to_string(),
CliToolEntry {
command: "bash".to_string(),
args: vec!["-c".to_string()],
description: "".to_string(),
timeout: Duration::from_millis(50),
max_output_chars: 10000,
is_dynamic: false,
},
);
let tool = CliAgentTool {
tools: Arc::new(std::sync::RwLock::new(tools_map)),
tool_names: Arc::new(std::sync::RwLock::new(vec!["bash".to_string()])),
running: Arc::new(Mutex::new(HashMap::new())),
completed: Arc::new(Mutex::new(HashMap::new())),
working_dir_claims: Arc::new(std::sync::Mutex::new(HashMap::new())),
state: state as Arc<dyn StateStore>,
llm_runtime: SharedLlmRuntime::new(
provider as Arc<dyn crate::traits::ModelProvider>,
None,
crate::config::ProviderKind::OpenaiCompatible,
"mock".to_string(),
),
default_timeout: Duration::from_secs(10),
default_max_output: 10000,
max_concurrent: 3,
concurrency_limiter: Arc::new(Semaphore::new(3)),
approval_tx,
hub: OnceLock::new(),
};
let resp = tool
.call(
r#"{"action":"run","tool":"bash","prompt":"sleep 0.2; echo cli-reap-ok","_session_id":"sess1"}"#,
)
.await
.unwrap();
let task_id = extract_task_id_from_background_message(&resp);
tokio::time::sleep(Duration::from_millis(500)).await;
let check = tool
.call(&format!(
r#"{{"action":"check","task_id":"{}","_session_id":"sess1"}}"#,
task_id
))
.await
.unwrap();
assert!(check.contains("cli-reap-ok"));
assert!(check.contains("finished"));
}
#[tokio::test]
async fn test_capture_git_diff_no_repo() {
let tmp = tempfile::TempDir::new().unwrap();
let result = CliAgentTool::capture_git_diff(tmp.path().to_str().unwrap()).await;
assert!(result.is_none(), "Non-git directory should return None");
}
#[tokio::test]
async fn test_capture_git_diff_with_changes() {
let tmp = tempfile::TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
tokio::process::Command::new("git")
.args(["init"])
.current_dir(dir)
.output()
.await
.unwrap();
tokio::process::Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(dir)
.output()
.await
.unwrap();
tokio::process::Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(dir)
.output()
.await
.unwrap();
std::fs::write(tmp.path().join("file.txt"), "initial").unwrap();
tokio::process::Command::new("git")
.args(["add", "."])
.current_dir(dir)
.output()
.await
.unwrap();
tokio::process::Command::new("git")
.args(["commit", "-m", "initial"])
.current_dir(dir)
.output()
.await
.unwrap();
std::fs::write(tmp.path().join("file.txt"), "modified content").unwrap();
let result = CliAgentTool::capture_git_diff(dir).await;
assert!(result.is_some(), "Should capture uncommitted changes");
let diff = result.unwrap();
assert!(
diff.contains("modified content") || diff.contains("file.txt"),
"Diff should mention the changed file, got: {}",
diff
);
}
#[tokio::test]
async fn test_unknown_action() {
let (tool, _db) = setup_echo_tool().await;
let result = tool.call(r#"{"action":"invalid_action"}"#).await.unwrap();
assert!(result.contains("Unknown action"));
}
#[tokio::test]
async fn test_has_tools() {
let (tool, _db) = setup_echo_tool().await;
assert!(tool.has_tools());
assert!(Tool::is_available(&tool));
tool.tools.write().unwrap().clear();
assert!(!tool.has_tools());
assert!(!Tool::is_available(&tool));
}
#[tokio::test]
async fn test_run_without_tool_and_no_agents_errors() {
let (tool, _db) = setup_echo_tool().await;
tool.tools.write().unwrap().clear();
tool.tool_names.write().unwrap().clear();
let result = tool.call(r#"{"action":"run","prompt":"test"}"#).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("No CLI agents available"),
"Expected no-agents error, got: {}",
err
);
}
#[tokio::test]
async fn test_build_enriched_prompt_basic() {
let (tool, _db) = setup_echo_tool().await;
let prompt = tool
.build_enriched_prompt(
"test-session",
"You are a security auditor",
"Audit this codebase",
None,
)
.await;
assert!(prompt.contains("You are a security auditor"));
assert!(prompt.contains("Audit this codebase"));
assert!(prompt.contains("## Task"));
assert!(prompt.contains("## Instructions"));
}
#[tokio::test]
async fn test_build_enriched_prompt_no_instruction() {
let (tool, _db) = setup_echo_tool().await;
let prompt = tool
.build_enriched_prompt("test-session", "", "Just do the task", None)
.await;
assert!(prompt.contains("Just do the task"));
assert!(prompt.contains("## Task"));
}
#[tokio::test]
async fn test_user_prompt_website_about_cars() {
let (tool, _db) = setup_bash_tool().await;
let user_prompt = "I need to create a new website about cars. We should push it to cars.example.com. make it modern.";
let start = Instant::now();
let args = serde_json::json!({
"action": "run",
"tool": "bash-agent",
"prompt": format!("echo 'Received prompt: {}'; echo 'Task complete'", user_prompt),
"_session_id": "telegram_12345"
});
let result = tool.call(&args.to_string()).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(5),
"Took {:?} — should complete quickly, not hang",
elapsed
);
assert!(
result.contains("Task complete"),
"Expected output, got: {}",
result
);
}
#[tokio::test]
async fn test_user_prompt_with_system_instruction() {
let (tool, _db) = setup_bash_tool().await;
let start = Instant::now();
let args = serde_json::json!({
"action": "run",
"tool": "bash-agent",
"prompt": "echo 'Building website...'; echo 'Created index.html'; echo 'Done'",
"system_instruction": "You are a senior web developer. Create a modern, responsive website.",
"_session_id": "telegram_12345"
});
let result = tool.call(&args.to_string()).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(5),
"Enriched prompt flow took {:?}",
elapsed
);
assert!(result.contains("Done"), "Expected output, got: {}", result);
}
#[tokio::test]
async fn test_claude_stream_json_output_parsing() {
let (tool, _db) = setup_bash_tool().await;
let stream_json = r#"
echo '{"type":"assistant","message":{"content":[{"type":"text","text":"I will create the website."}]}}'
echo '{"type":"tool_use","name":"Bash","input":{"command":"mkdir -p website"}}'
echo '{"type":"tool_result","content":"Directory created"}'
echo '{"type":"result","result":"Website created successfully with index.html, style.css, and script.js"}'
"#;
let args = serde_json::json!({
"action": "run",
"tool": "bash-agent",
"prompt": stream_json.trim()
});
let result = tool.call(&args.to_string()).await.unwrap();
assert!(
result.contains("Website created successfully"),
"Should extract result from JSON output, got: {}",
result
);
}
#[tokio::test]
async fn test_progress_extraction_from_claude_stream() {
let assistant_json = r#"{"type":"assistant","message":{"content":[{"type":"tool_use","name":"Bash","input":{"command":"npm install react"}}]}}"#;
let progress = extract_progress_from_json(assistant_json);
assert!(
progress.is_some(),
"Should extract progress from assistant tool_use event"
);
assert!(
progress.unwrap().contains("npm install react"),
"Should include the command"
);
let thinking_json = r#"{"type":"thinking"}"#;
let progress = extract_progress_from_json(thinking_json);
assert_eq!(progress, Some("💭 Thinking...".to_string()));
}
#[tokio::test]
async fn test_stderr_captured_in_error_output() {
let (tool, _db) = setup_bash_tool().await;
let args = serde_json::json!({
"action": "run",
"tool": "bash-agent",
"prompt": "echo 'some stdout'; echo 'error detail 1' >&2; echo 'error detail 2' >&2; exit 1"
});
let result = tool.call(&args.to_string()).await.unwrap();
assert!(result.contains("ERROR"));
assert!(
result.contains("error detail 1"),
"Should capture stderr, got: {}",
result
);
}
#[tokio::test]
async fn test_run_with_working_dir() {
let (tool, _db) = setup_bash_tool().await;
let tmp_dir = tempfile::TempDir::new().unwrap();
let dir_path = tmp_dir.path().to_str().unwrap();
let args = serde_json::json!({
"action": "run",
"tool": "bash-agent",
"prompt": "pwd",
"working_dir": dir_path
});
let result = tool.call(&args.to_string()).await.unwrap();
assert!(
result.contains(dir_path),
"CLI agent should run in specified working dir, got: {}",
result
);
}
#[test]
fn test_prompt_similarity_scores() {
assert!(
(prompt_similarity("refactor the auth module", "refactor the auth module") - 1.0).abs()
< f64::EPSILON
);
let sim = prompt_similarity(
"refactor the auth module to use JWT",
"refactor the auth module to use OAuth",
);
assert!(
sim > 0.5,
"Similar prompts should score > 0.5, got: {:.3}",
sim
);
let sim = prompt_similarity("refactor the auth module", "deploy to production server");
assert!(
sim < 0.3,
"Different prompts should score < 0.3, got: {:.3}",
sim
);
assert!((prompt_similarity("", "") - 1.0).abs() < f64::EPSILON);
assert!((prompt_similarity("hello", "") - 0.0).abs() < f64::EPSILON);
assert!((prompt_similarity("deploy", "deploy") - 1.0).abs() < f64::EPSILON);
assert!((prompt_similarity("deploy", "refactor") - 0.0).abs() < f64::EPSILON);
}
#[tokio::test]
async fn test_duplicate_task_detection() {
let (tool, _db) = setup_bash_tool().await;
let tmp_dir = tempfile::TempDir::new().unwrap();
let dir = tmp_dir.path().to_string_lossy().to_string();
let first_args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"refactor the auth module to use JWT tokens","working_dir":"{}","async_mode":true}}"#,
dir
);
let first = tool.call(&first_args).await.unwrap();
assert!(
first.contains("started in background"),
"Expected first task to start, got: {}",
first
);
let second_args = format!(
r#"{{"action":"run","tool":"bash-agent","prompt":"refactor the auth module to use OAuth tokens","working_dir":"{}"}}"#,
dir
);
let second = tool.call(&second_args).await.unwrap();
assert!(
second.contains("BLOCKED") && second.contains("similar task"),
"Expected duplicate task BLOCKED message, got: {}",
second
);
tool.handle_cancel_all("").await.unwrap();
}
}