use super::{PlanDecision, Tool, ToolResult, parse_tool_args, schema_to_tool_params};
use crate::command::chat::constants::{
BG_TASK_CMD_DISPLAY_MAX_CHARS, BG_TASK_DEFAULT_TIMEOUT_MS, BG_TASK_MAX_TIMEOUT_MS,
};
use crate::util::safe_lock;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, atomic::AtomicBool};
use std::time::{Duration, Instant};
pub(super) struct BgTask {
pub task_id: String,
pub command: String,
pub status: String, pub output_buffer: Arc<Mutex<String>>,
pub result: Option<String>,
pub started_at: Instant,
}
#[derive(Debug)]
pub struct BgNotification {
pub task_id: String,
pub command: String,
pub status: String,
pub result: String,
}
pub struct BackgroundManager {
pub(super) tasks: Mutex<HashMap<String, BgTask>>,
notifications: Mutex<Vec<BgNotification>>,
next_id: Mutex<u64>,
}
impl std::fmt::Debug for BackgroundManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let tasks_count = self.tasks.lock().map_or(0, |t| t.len());
let notif_count = self.notifications.lock().map_or(0, |n| n.len());
let next_id = self.next_id.lock().map_or(0, |id| *id);
f.debug_struct("BackgroundManager")
.field("tasks_count", &tasks_count)
.field("notifications_count", ¬if_count)
.field("next_id", &next_id)
.finish()
}
}
impl Default for BackgroundManager {
fn default() -> Self {
Self::new()
}
}
impl BackgroundManager {
pub fn new() -> Self {
Self {
tasks: Mutex::new(HashMap::new()),
notifications: Mutex::new(Vec::new()),
next_id: Mutex::new(1),
}
}
fn gen_id(&self) -> String {
let mut id = safe_lock(&self.next_id, "BackgroundManager::gen_id");
let current = *id;
*id += 1;
format!("bg_{}", current)
}
pub fn spawn_command(
&self,
command: &str,
_cwd: Option<String>,
_timeout_secs: u64,
) -> (String, Arc<Mutex<String>>) {
let task_id = self.gen_id();
let output_buffer = Arc::new(Mutex::new(String::new()));
let bg_task = BgTask {
task_id: task_id.clone(),
command: command.to_string(),
status: "running".to_string(),
output_buffer: Arc::clone(&output_buffer),
result: None,
started_at: Instant::now(),
};
{
let mut tasks = safe_lock(&self.tasks, "BackgroundManager::spawn_command");
tasks.insert(task_id.clone(), bg_task);
}
(task_id, output_buffer)
}
pub fn complete_task(&self, task_id: &str, status: &str, result: String) {
let command;
{
let mut tasks = safe_lock(&self.tasks, "BackgroundManager::complete_task");
if let Some(task) = tasks.get_mut(task_id) {
task.status = status.to_string();
task.result = Some(result.clone());
command = task.command.clone();
} else {
return;
}
}
{
let mut notifs = safe_lock(&self.notifications, "BackgroundManager::complete_notify");
notifs.push(BgNotification {
task_id: task_id.to_string(),
command,
status: status.to_string(),
result,
});
}
}
pub fn drain_notifications(&self) -> Vec<BgNotification> {
let mut notifs = safe_lock(
&self.notifications,
"BackgroundManager::drain_notifications",
);
std::mem::take(&mut *notifs)
}
pub fn get_task_status(&self, task_id: &str) -> Option<Value> {
let tasks = safe_lock(&self.tasks, "BackgroundManager::get_task_status");
tasks.get(task_id).map(|t| {
let output = {
let buf = safe_lock(&t.output_buffer, "BgTask::output_buffer");
if buf.is_empty() {
t.result.clone()
} else {
Some(buf.clone())
}
};
json!({
"task_id": t.task_id,
"command": t.command,
"status": t.status,
"output": output,
})
})
}
pub fn is_running(&self, task_id: &str) -> bool {
let tasks = safe_lock(&self.tasks, "BackgroundManager::is_running");
tasks
.get(task_id)
.map(|t| t.status == "running")
.unwrap_or(false)
}
pub fn list_running(&self) -> Vec<(String, String, u64)> {
let tasks = safe_lock(&self.tasks, "BackgroundManager::list_running");
let now = Instant::now();
let mut out: Vec<_> = tasks
.values()
.filter(|t| t.status == "running")
.map(|t| {
let elapsed = now.duration_since(t.started_at).as_secs();
let cmd_summary = if t.command.chars().count() > 80 {
let truncated: String = t
.command
.chars()
.take(BG_TASK_CMD_DISPLAY_MAX_CHARS)
.collect();
format!("{}...", truncated)
} else {
t.command.clone()
};
(t.task_id.clone(), cmd_summary, elapsed)
})
.collect();
out.sort_by(|a, b| a.0.cmp(&b.0));
out
}
}
pub fn build_running_summary(manager: &Arc<BackgroundManager>) -> String {
let running = manager.list_running();
if running.is_empty() {
return String::new();
}
let mut md = String::from(
"## Background Tasks\n\n\
The following background tasks are still running. \
Use TaskOutput to wait for or check their results when needed. \
Do not re-spawn these commands.\n",
);
for (id, cmd, elapsed) in &running {
md.push_str(&format!(
"- {} (running {}): {}\n",
id,
format_elapsed(*elapsed),
cmd
));
}
md.trim_end().to_string()
}
fn format_elapsed(secs: u64) -> String {
if secs < 60 {
format!("{}s", secs)
} else if secs < 3600 {
format!("{}m{}s", secs / 60, secs % 60)
} else {
format!("{}h{}m", secs / 3600, (secs % 3600) / 60)
}
}
#[derive(Deserialize, JsonSchema)]
struct TaskOutputParams {
task_id: String,
#[serde(default = "default_block")]
block: bool,
#[serde(default = "default_timeout_ms")]
timeout: u64,
}
fn default_block() -> bool {
true
}
fn default_timeout_ms() -> u64 {
BG_TASK_DEFAULT_TIMEOUT_MS
}
#[derive(Debug)]
pub struct TaskOutputTool {
pub manager: Arc<BackgroundManager>,
}
impl TaskOutputTool {
pub const NAME: &'static str = "TaskOutput";
}
impl Tool for TaskOutputTool {
fn name(&self) -> &str {
Self::NAME
}
fn description(&self) -> &str {
r#"
Retrieves output from a running or completed background task (started via Bash with run_in_background: true).
Use block=true (default) to wait for task completion; use block=false for a non-blocking status check.
Returns the task output along with status information.
"#
}
fn parameters_schema(&self) -> Value {
schema_to_tool_params::<TaskOutputParams>()
}
fn execute(&self, arguments: &str, cancelled: &Arc<AtomicBool>) -> ToolResult {
let params: TaskOutputParams = match parse_tool_args(arguments) {
Ok(p) => p,
Err(e) => return e,
};
let timeout_ms = params.timeout.min(BG_TASK_MAX_TIMEOUT_MS);
if self.manager.get_task_status(¶ms.task_id).is_none() {
return ToolResult {
output: format!("后台任务 {} 不存在", params.task_id),
is_error: true,
images: vec![],
plan_decision: PlanDecision::None,
};
}
if params.block && self.manager.is_running(¶ms.task_id) {
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
loop {
if !self.manager.is_running(¶ms.task_id) {
break;
}
if cancelled.load(std::sync::atomic::Ordering::Relaxed) {
let info = self
.manager
.get_task_status(¶ms.task_id)
.unwrap_or(json!({}));
let mut obj = info.clone();
if let Some(map) = obj.as_object_mut() {
map.insert(
"note".to_string(),
json!("cancelled: request was cancelled while waiting for task output"),
);
}
return ToolResult {
output: serde_json::to_string_pretty(&obj).unwrap_or_default(),
is_error: false,
images: vec![],
plan_decision: PlanDecision::None,
};
}
if Instant::now() >= deadline {
let info = self
.manager
.get_task_status(¶ms.task_id)
.unwrap_or(json!({}));
let mut obj = info.clone();
if let Some(map) = obj.as_object_mut() {
map.insert(
"note".to_string(),
json!("still running: timeout waiting for completion"),
);
}
return ToolResult {
output: serde_json::to_string_pretty(&obj).unwrap_or_default(),
is_error: false,
images: vec![],
plan_decision: PlanDecision::None,
};
}
std::thread::sleep(Duration::from_millis(50));
}
}
match self.manager.get_task_status(¶ms.task_id) {
Some(info) => ToolResult {
output: serde_json::to_string_pretty(&info).unwrap_or_default(),
is_error: false,
images: vec![],
plan_decision: PlanDecision::None,
},
None => ToolResult {
output: format!("后台任务 {} 不存在", params.task_id),
is_error: true,
images: vec![],
plan_decision: PlanDecision::None,
},
}
}
}