1use super::{PlanDecision, Tool, ToolResult, parse_tool_args, schema_to_tool_params};
2use crate::constants::{
3 BG_TASK_CMD_DISPLAY_MAX_CHARS, BG_TASK_DEFAULT_TIMEOUT_MS, BG_TASK_MAX_TIMEOUT_MS,
4};
5use crate::util::safe_lock;
6use schemars::JsonSchema;
7use serde::Deserialize;
8use serde_json::{Value, json};
9use std::borrow::Cow;
10use std::collections::HashMap;
11use std::io::Write;
12use std::sync::{
13 Arc, Mutex,
14 atomic::{AtomicBool, Ordering},
15};
16use std::time::{Duration, Instant};
17
18pub(super) struct BgTask {
22 pub task_id: String,
23 pub command: String,
24 pub status: String, pub output_buffer: Arc<Mutex<String>>,
27 pub result: Option<String>,
28 pub started_at: Instant,
30 pub child_pid: Option<u32>,
32 pub is_thread_running: Option<Arc<AtomicBool>>,
34 pub pty_writer: Option<Arc<Mutex<Box<dyn Write + Send>>>>,
36}
37
38#[derive(Debug)]
40pub struct BgNotification {
41 pub task_id: String,
42 pub command: String,
43 pub status: String,
44 pub result: String,
45}
46
47pub struct BackgroundManager {
51 pub(super) tasks: Mutex<HashMap<String, BgTask>>,
52 notifications: Mutex<Vec<BgNotification>>,
53 next_id: Mutex<u64>,
54}
55
56impl std::fmt::Debug for BackgroundManager {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 let tasks_count = self.tasks.lock().map_or(0, |t| t.len());
59 let notif_count = self.notifications.lock().map_or(0, |n| n.len());
60 let next_id = self.next_id.lock().map_or(0, |id| *id);
61 f.debug_struct("BackgroundManager")
62 .field("tasks_count", &tasks_count)
63 .field("notifications_count", ¬if_count)
64 .field("next_id", &next_id)
65 .finish()
66 }
67}
68
69impl Default for BackgroundManager {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl BackgroundManager {
76 pub fn new() -> Self {
78 Self {
79 tasks: Mutex::new(HashMap::new()),
80 notifications: Mutex::new(Vec::new()),
81 next_id: Mutex::new(1),
82 }
83 }
84
85 fn gen_id(&self) -> String {
87 let mut id = safe_lock(&self.next_id, "BackgroundManager::gen_id");
88 let current = *id;
89 *id += 1;
90 format!("bg_{}", current)
91 }
92
93 pub fn spawn_command(
98 &self,
99 command: &str,
100 _cwd: Option<String>,
101 _timeout_secs: u64,
102 is_thread_running: Option<Arc<AtomicBool>>,
103 ) -> (String, Arc<Mutex<String>>) {
104 let task_id = self.gen_id();
105 let output_buffer = Arc::new(Mutex::new(String::new()));
106
107 let bg_task = BgTask {
108 task_id: task_id.clone(),
109 command: command.to_string(),
110 status: "running".to_string(),
111 output_buffer: Arc::clone(&output_buffer),
112 result: None,
113 started_at: Instant::now(),
114 child_pid: None,
115 is_thread_running,
116 pty_writer: None,
117 };
118
119 {
120 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::spawn_command");
121 tasks.insert(task_id.clone(), bg_task);
122 }
123
124 (task_id, output_buffer)
125 }
126
127 pub fn adopt_process(
131 &self,
132 command: &str,
133 pid: u32,
134 started_at: Instant,
135 ) -> (String, Arc<Mutex<String>>) {
136 let task_id = self.gen_id();
137 let output_buffer = Arc::new(Mutex::new(String::new()));
138
139 let bg_task = BgTask {
140 task_id: task_id.clone(),
141 command: command.to_string(),
142 status: "running".to_string(),
143 output_buffer: Arc::clone(&output_buffer),
144 result: None,
145 started_at,
146 child_pid: Some(pid),
147 is_thread_running: None,
148 pty_writer: None,
149 };
150
151 {
152 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::adopt_process");
153 tasks.insert(task_id.clone(), bg_task);
154 }
155
156 (task_id, output_buffer)
157 }
158
159 pub fn update_child_pid(&self, task_id: &str, pid: u32) {
161 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::update_child_pid");
162 if let Some(task) = tasks.get_mut(task_id) {
163 task.child_pid = Some(pid);
164 crate::util::log::write_info_log(
165 "BgTask::update_child_pid",
166 &format!("后台任务 {} 已关联子进程 PID: {}", task_id, pid),
167 );
168 }
169 }
170
171 pub fn set_pty_writer(&self, task_id: &str, writer: Box<dyn Write + Send>) {
173 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::set_pty_writer");
174 if let Some(task) = tasks.get_mut(task_id) {
175 task.pty_writer = Some(Arc::new(Mutex::new(writer)));
176 crate::util::log::write_info_log(
177 "BgTask::set_pty_writer",
178 &format!("后台任务 {} 已设置 PTY writer", task_id),
179 );
180 }
181 }
182
183 pub fn session_stdin(&self, task_id: &str, text: &str) -> Result<(), String> {
185 let tasks = safe_lock(&self.tasks, "BackgroundManager::session_stdin");
186 let task = tasks
187 .get(task_id)
188 .ok_or_else(|| "session not found or process already exited".to_string())?;
189 let writer = task
190 .pty_writer
191 .as_ref()
192 .ok_or_else(|| "not an interactive session".to_string())?;
193 let mut w = safe_lock(writer, "session_stdin::pty_writer");
194 w.write_all(text.as_bytes())
195 .and_then(|_| w.flush())
196 .map_err(|e: std::io::Error| e.to_string())
197 }
198
199 pub fn session_stdout(&self, task_id: &str, timeout_ms: u64) -> Result<String, String> {
201 let tasks = safe_lock(&self.tasks, "BackgroundManager::session_stdout");
202 let task = tasks
203 .get(task_id)
204 .ok_or_else(|| "session not found or process already exited".to_string())?;
205 if task.pty_writer.is_none() {
206 return Err("not an interactive session".to_string());
207 }
208 let buf = safe_lock(&task.output_buffer, "session_stdout::buf");
210 let start_len = buf.len();
211 drop(buf);
212 drop(tasks);
213
214 let deadline = Instant::now() + Duration::from_millis(timeout_ms);
216 while Instant::now() < deadline {
217 let tasks = safe_lock(&self.tasks, "session_stdout::poll");
218 if let Some(task) = tasks.get(task_id) {
219 let buf = safe_lock(&task.output_buffer, "session_stdout::poll_buf");
220 if buf.len() > start_len {
221 return Ok(buf[start_len..].to_string());
223 }
224 if task.status != "running" {
226 let output = buf[start_len..].to_string();
227 return Ok(output);
228 }
229 } else {
230 return Err("session not found".to_string());
231 }
232 std::thread::sleep(Duration::from_millis(50));
233 }
234 let tasks = safe_lock(&self.tasks, "session_stdout::timeout");
236 if let Some(task) = tasks.get(task_id) {
237 let buf = safe_lock(&task.output_buffer, "session_stdout::timeout_buf");
238 return Ok(buf[start_len..].to_string());
239 }
240 Ok(String::new())
241 }
242
243 pub fn session_quit(&self, task_id: &str) -> Result<(), String> {
245 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::session_quit");
246 if let Some(task) = tasks.get_mut(task_id) {
247 task.pty_writer = None; task.status = "completed".to_string();
249 task.result = Some("session quit by user".to_string());
250 } else {
251 return Err("session not found".to_string());
252 }
253 Ok(())
254 }
255
256 pub fn complete_task(&self, task_id: &str, status: &str, result: String) {
258 let command;
259 {
260 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::complete_task");
261 if let Some(task) = tasks.get_mut(task_id) {
262 task.status = status.to_string();
263 task.result = Some(result.clone());
264 command = task.command.clone();
265 } else {
266 return;
267 }
268 }
269 {
270 let mut notifs = safe_lock(&self.notifications, "BackgroundManager::complete_notify");
271 notifs.push(BgNotification {
272 task_id: task_id.to_string(),
273 command,
274 status: status.to_string(),
275 result,
276 });
277 }
278 }
279
280 pub fn drain_notifications(&self) -> Vec<BgNotification> {
282 let mut notifs = safe_lock(
283 &self.notifications,
284 "BackgroundManager::drain_notifications",
285 );
286 std::mem::take(&mut *notifs)
287 }
288
289 pub fn get_task_status(&self, task_id: &str) -> Option<Value> {
291 let tasks = safe_lock(&self.tasks, "BackgroundManager::get_task_status");
292 tasks.get(task_id).map(|t| {
293 let output = {
295 let buf = safe_lock(&t.output_buffer, "BgTask::output_buffer");
296 if buf.is_empty() {
297 t.result.clone()
298 } else {
299 Some(buf.clone())
300 }
301 };
302 json!({
303 "task_id": t.task_id,
304 "command": t.command,
305 "status": t.status,
306 "output": output,
307 })
308 })
309 }
310
311 pub fn is_running(&self, task_id: &str) -> bool {
313 let tasks = safe_lock(&self.tasks, "BackgroundManager::is_running");
314 tasks
315 .get(task_id)
316 .map(|t| t.status == "running")
317 .unwrap_or(false)
318 }
319
320 pub fn list_running(&self) -> Vec<(String, String, u64, bool)> {
326 let tasks = safe_lock(&self.tasks, "BackgroundManager::list_running");
327 let now = Instant::now();
328 let mut out: Vec<_> = tasks
329 .values()
330 .filter(|t| t.status == "running")
331 .map(|t| {
332 let elapsed = now.duration_since(t.started_at).as_secs();
333 let cmd_summary = if t.command.chars().count() > 80 {
335 let truncated: String = t
336 .command
337 .chars()
338 .take(BG_TASK_CMD_DISPLAY_MAX_CHARS)
339 .collect();
340 format!("{}...", truncated)
341 } else {
342 t.command.clone()
343 };
344 let is_interactive = t.pty_writer.is_some();
345 (t.task_id.clone(), cmd_summary, elapsed, is_interactive)
346 })
347 .collect();
348 out.sort_by(|a, b| a.0.cmp(&b.0));
349 out
350 }
351
352 pub fn cleanup_dead_tasks(&self) {
355 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::cleanup_dead_tasks");
356 let running_count = tasks.values().filter(|t| t.status == "running").count();
357
358 if running_count == 0 {
359 return;
360 }
361
362 crate::util::log::write_info_log(
363 "BgTask::cleanup_dead_tasks",
364 &format!("开始存活检测,共 {} 个 running 任务", running_count),
365 );
366
367 let mut dead_tasks = Vec::new();
368
369 for task in tasks.values() {
370 if task.status != "running" {
371 continue;
372 }
373
374 let confirmed_alive = if let Some(pid) = task.child_pid {
375 let alive = process_exists(pid);
376 if !alive {
377 crate::util::log::write_info_log(
378 "BgTask::cleanup_dead_tasks",
379 &format!("任务 {} (PID: {}) 进程不存在", task.task_id, pid),
380 );
381 }
382 alive
383 } else if let Some(ref is_running) = task.is_thread_running {
384 let alive = is_running.load(Ordering::Relaxed);
386 if !alive {
387 crate::util::log::write_info_log(
388 "BgTask::cleanup_dead_tasks",
389 &format!(
390 "任务 {} 线程标记已置为 false (cmd: {})",
391 task.task_id, task.command
392 ),
393 );
394 }
395 alive
396 } else {
397 crate::util::log::write_info_log(
399 "BgTask::cleanup_dead_tasks",
400 &format!(
401 "任务 {} 无 PID 且无线程标记,使用 command 匹配检测 (cmd: {})",
402 task.task_id, task.command
403 ),
404 );
405 is_process_alive_by_command(&task.command)
406 };
407
408 if !confirmed_alive {
409 dead_tasks.push((task.task_id.clone(), task.command.clone(), task.child_pid));
410 }
411 }
412
413 for (task_id, command, pid) in &dead_tasks {
415 if let Some(task) = tasks.get_mut(task_id) {
416 task.status = "dead".to_string();
417 let pid_info = pid.map_or(String::new(), |p| format!(" (PID: {})", p));
418 task.result = Some(format!(
419 "进程已终止{}:被外部杀死、崩溃或 PID 被复用",
420 pid_info
421 ));
422 }
423 let pid_info = pid.map_or(String::new(), |p| format!("PID: {}", p));
424 crate::util::log::write_info_log(
425 "BgTask::cleanup_dead_tasks",
426 &format!(
427 "任务 {} ({} cmd: {}) 已确认为 dead",
428 task_id, pid_info, command
429 ),
430 );
431 }
432
433 crate::util::log::write_info_log(
434 "BgTask::cleanup_dead_tasks",
435 &format!("存活检测完成,发现 {} 个 dead 任务", dead_tasks.len()),
436 );
437
438 if !dead_tasks.is_empty() {
440 let mut notifs = safe_lock(
441 &self.notifications,
442 "BackgroundManager::cleanup_dead_tasks_notify",
443 );
444 for (task_id, command, pid) in dead_tasks {
445 let pid_info = pid.map_or(String::new(), |p| format!(" (PID: {})", p));
446 notifs.push(BgNotification {
447 task_id,
448 command,
449 status: "dead".to_string(),
450 result: format!("进程已终止{}:被外部杀死、崩溃或 PID 被复用", pid_info),
451 });
452 }
453 }
454 }
455}
456
457#[cfg(unix)]
462fn process_exists(pid: u32) -> bool {
463 use nix::errno::Errno;
464 use nix::sys::signal::kill;
465 use nix::unistd::Pid;
466
467 match kill(Pid::from_raw(pid as i32), None) {
468 Ok(_) => true,
469 Err(Errno::ESRCH) => false, Err(_) => true, }
472}
473
474#[cfg(not(unix))]
475fn process_exists(_pid: u32) -> bool {
476 true
477}
478
479#[cfg(unix)]
481fn is_process_alive_by_command(command: &str) -> bool {
482 use std::process::Command;
483
484 let cmd_name = command.split_whitespace().next().unwrap_or(command);
485 let output = Command::new("pgrep").arg("-x").arg(cmd_name).output();
486 match output {
487 Ok(o) => o.status.success(),
488 Err(_) => true, }
490}
491
492#[cfg(not(unix))]
493fn is_process_alive_by_command(_command: &str) -> bool {
494 true
495}
496
497pub fn build_running_summary(manager: &Arc<BackgroundManager>) -> String {
499 let running = manager.list_running();
500 if running.is_empty() {
501 return String::new();
502 }
503 let (sessions, bg_jobs): (Vec<_>, Vec<_>) = running
504 .into_iter()
505 .partition(|(_, _, _, interactive)| *interactive);
506
507 let mut out = String::new();
508
509 if !bg_jobs.is_empty() {
510 out.push_str(
511 "## Background Tasks\n\n\
512 The following background tasks are still running. \
513 Use TaskOutput to wait for or check their results when needed. \
514 Do not re-spawn these commands.\n",
515 );
516 for (id, cmd, elapsed, _) in &bg_jobs {
517 out.push_str(&format!(
518 "- {} (running {}): {}\n",
519 id,
520 format_elapsed(*elapsed),
521 cmd
522 ));
523 }
524 }
525
526 if !sessions.is_empty() {
527 if !out.is_empty() {
528 out.push('\n');
529 }
530 out.push_str(
531 "## Interactive Sessions\n\n\
532 The following interactive PTY sessions are alive (started via Bash/Powershell with interactive: true). \
533 Use the Session tool (action=stdin/stdout/quit) with the sid below to interact with them. \
534 Do NOT use TaskOutput and do NOT re-spawn these processes — they are still running and waiting for input.\n",
535 );
536 for (id, cmd, elapsed, _) in &sessions {
537 out.push_str(&format!(
538 "- sid={} (alive {}): {}\n",
539 id,
540 format_elapsed(*elapsed),
541 cmd
542 ));
543 }
544 }
545
546 out.trim_end().to_string()
547}
548
549fn format_elapsed(secs: u64) -> String {
550 if secs < 60 {
551 format!("{}s", secs)
552 } else if secs < 3600 {
553 format!("{}m{}s", secs / 60, secs % 60)
554 } else {
555 format!("{}h{}m", secs / 3600, (secs % 3600) / 60)
556 }
557}
558
559#[derive(Deserialize, JsonSchema)]
563struct TaskOutputParams {
564 task_id: String,
566 #[serde(default = "default_block")]
568 block: bool,
569 #[serde(default = "default_timeout_ms")]
571 timeout: u64,
572}
573
574fn default_block() -> bool {
575 true
576}
577
578fn default_timeout_ms() -> u64 {
579 BG_TASK_DEFAULT_TIMEOUT_MS
580}
581
582#[derive(Debug)]
584pub struct TaskOutputTool {
585 pub manager: Arc<BackgroundManager>,
586}
587
588impl TaskOutputTool {
589 pub const NAME: &'static str = "TaskOutput";
590}
591
592impl Tool for TaskOutputTool {
593 fn name(&self) -> &str {
594 Self::NAME
595 }
596
597 fn description(&self) -> Cow<'_, str> {
598 r#"
599 Retrieves output from a running or completed background task (started via Bash with run_in_background: true).
600 Use block=true (default) to wait for task completion; use block=false for a non-blocking status check.
601 Returns the task output along with status information.
602 "#.into()
603 }
604
605 fn parameters_schema(&self) -> Value {
606 schema_to_tool_params::<TaskOutputParams>()
607 }
608
609 fn execute(&self, arguments: &str, cancelled: &Arc<AtomicBool>) -> ToolResult {
610 let params: TaskOutputParams = match parse_tool_args(arguments) {
611 Ok(p) => p,
612 Err(e) => return e,
613 };
614
615 let timeout_ms = params.timeout.min(BG_TASK_MAX_TIMEOUT_MS);
616
617 if self.manager.get_task_status(¶ms.task_id).is_none() {
619 return ToolResult {
620 output: format!("后台任务 {} 不存在", params.task_id),
621 is_error: true,
622 images: vec![],
623 plan_decision: PlanDecision::None,
624 };
625 }
626
627 if params.block && self.manager.is_running(¶ms.task_id) {
629 let deadline = Instant::now() + Duration::from_millis(timeout_ms);
630 loop {
631 if !self.manager.is_running(¶ms.task_id) {
632 break;
633 }
634 if cancelled.load(std::sync::atomic::Ordering::Relaxed) {
636 let info = self
637 .manager
638 .get_task_status(¶ms.task_id)
639 .unwrap_or(json!({}));
640 let mut obj = info.clone();
641 if let Some(map) = obj.as_object_mut() {
642 map.insert(
643 "note".to_string(),
644 json!("cancelled: request was cancelled while waiting for task output"),
645 );
646 }
647 return ToolResult {
648 output: serde_json::to_string_pretty(&obj).unwrap_or_default(),
649 is_error: false,
650 images: vec![],
651 plan_decision: PlanDecision::None,
652 };
653 }
654 if Instant::now() >= deadline {
655 let info = self
657 .manager
658 .get_task_status(¶ms.task_id)
659 .unwrap_or(json!({}));
660 let mut obj = info.clone();
661 if let Some(map) = obj.as_object_mut() {
662 map.insert(
663 "note".to_string(),
664 json!("still running: timeout waiting for completion"),
665 );
666 }
667 return ToolResult {
668 output: serde_json::to_string_pretty(&obj).unwrap_or_default(),
669 is_error: false,
670 images: vec![],
671 plan_decision: PlanDecision::None,
672 };
673 }
674 std::thread::sleep(Duration::from_millis(50));
675 }
676 }
677
678 match self.manager.get_task_status(¶ms.task_id) {
680 Some(info) => ToolResult {
681 output: serde_json::to_string_pretty(&info).unwrap_or_default(),
682 is_error: false,
683 images: vec![],
684 plan_decision: PlanDecision::None,
685 },
686 None => ToolResult {
687 output: format!("后台任务 {} 不存在", params.task_id),
688 is_error: true,
689 images: vec![],
690 plan_decision: PlanDecision::None,
691 },
692 }
693 }
694}