1use super::{PlanDecision, Tool, ToolResult, parse_tool_args, schema_to_tool_params};
2use crate::constants::{
3 BG_TASK_CMD_DISPLAY_MAX_CHARS, BG_TASK_DEFAULT_TIMEOUT_MS, BG_TASK_MAX_TIMEOUT_MS,
4};
5use crate::util::safe_lock;
6use schemars::JsonSchema;
7use serde::Deserialize;
8use serde_json::{Value, json};
9use std::borrow::Cow;
10use std::collections::HashMap;
11use std::io::Write;
12use std::sync::{
13 Arc, Mutex,
14 atomic::{AtomicBool, Ordering},
15};
16use std::time::{Duration, Instant};
17
18pub(super) struct BgTask {
22 pub task_id: String,
23 pub command: String,
24 pub status: String, pub output_buffer: Arc<Mutex<String>>,
27 pub result: Option<String>,
28 pub started_at: Instant,
30 pub child_pid: Option<u32>,
32 pub is_thread_running: Option<Arc<AtomicBool>>,
34 pub pty_writer: Option<Arc<Mutex<Box<dyn Write + Send>>>>,
36}
37
38#[derive(Debug)]
40pub struct BgNotification {
41 pub task_id: String,
42 pub command: String,
43 pub status: String,
44 pub result: String,
45}
46
47pub struct BackgroundManager {
51 pub(super) tasks: Mutex<HashMap<String, BgTask>>,
52 notifications: Mutex<Vec<BgNotification>>,
53 next_id: Mutex<u64>,
54}
55
56impl std::fmt::Debug for BackgroundManager {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 let tasks_count = self.tasks.lock().map_or(0, |t| t.len());
59 let notif_count = self.notifications.lock().map_or(0, |n| n.len());
60 let next_id = self.next_id.lock().map_or(0, |id| *id);
61 f.debug_struct("BackgroundManager")
62 .field("tasks_count", &tasks_count)
63 .field("notifications_count", ¬if_count)
64 .field("next_id", &next_id)
65 .finish()
66 }
67}
68
69impl Default for BackgroundManager {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl BackgroundManager {
76 pub fn new() -> Self {
78 Self {
79 tasks: Mutex::new(HashMap::new()),
80 notifications: Mutex::new(Vec::new()),
81 next_id: Mutex::new(1),
82 }
83 }
84
85 fn gen_id(&self) -> String {
87 let mut id = safe_lock(&self.next_id, "BackgroundManager::gen_id");
88 let current = *id;
89 *id += 1;
90 format!("bg_{}", current)
91 }
92
93 pub fn spawn_command(
98 &self,
99 command: &str,
100 _cwd: Option<String>,
101 _timeout_secs: u64,
102 is_thread_running: Option<Arc<AtomicBool>>,
103 ) -> (String, Arc<Mutex<String>>) {
104 let task_id = self.gen_id();
105 let output_buffer = Arc::new(Mutex::new(String::new()));
106
107 let bg_task = BgTask {
108 task_id: task_id.clone(),
109 command: command.to_string(),
110 status: "running".to_string(),
111 output_buffer: Arc::clone(&output_buffer),
112 result: None,
113 started_at: Instant::now(),
114 child_pid: None,
115 is_thread_running,
116 pty_writer: None,
117 };
118
119 {
120 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::spawn_command");
121 tasks.insert(task_id.clone(), bg_task);
122 }
123
124 (task_id, output_buffer)
125 }
126
127 pub fn adopt_process(
131 &self,
132 command: &str,
133 pid: u32,
134 started_at: Instant,
135 ) -> (String, Arc<Mutex<String>>) {
136 let task_id = self.gen_id();
137 let output_buffer = Arc::new(Mutex::new(String::new()));
138
139 let bg_task = BgTask {
140 task_id: task_id.clone(),
141 command: command.to_string(),
142 status: "running".to_string(),
143 output_buffer: Arc::clone(&output_buffer),
144 result: None,
145 started_at,
146 child_pid: Some(pid),
147 is_thread_running: None,
148 pty_writer: None,
149 };
150
151 {
152 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::adopt_process");
153 tasks.insert(task_id.clone(), bg_task);
154 }
155
156 (task_id, output_buffer)
157 }
158
159 pub fn update_child_pid(&self, task_id: &str, pid: u32) {
161 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::update_child_pid");
162 if let Some(task) = tasks.get_mut(task_id) {
163 task.child_pid = Some(pid);
164 crate::util::log::write_info_log(
165 "BgTask::update_child_pid",
166 &format!("后台任务 {} 已关联子进程 PID: {}", task_id, pid),
167 );
168 }
169 }
170
171 pub fn set_pty_writer(&self, task_id: &str, writer: Box<dyn Write + Send>) {
173 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::set_pty_writer");
174 if let Some(task) = tasks.get_mut(task_id) {
175 task.pty_writer = Some(Arc::new(Mutex::new(writer)));
176 crate::util::log::write_info_log(
177 "BgTask::set_pty_writer",
178 &format!("后台任务 {} 已设置 PTY writer", task_id),
179 );
180 }
181 }
182
183 pub fn session_stdin(&self, task_id: &str, text: &str) -> Result<(), String> {
185 let tasks = safe_lock(&self.tasks, "BackgroundManager::session_stdin");
186 let task = tasks
187 .get(task_id)
188 .ok_or_else(|| "session not found or process already exited".to_string())?;
189 let writer = task
190 .pty_writer
191 .as_ref()
192 .ok_or_else(|| "not an interactive session".to_string())?;
193 let mut w = safe_lock(writer, "session_stdin::pty_writer");
194 w.write_all(text.as_bytes())
195 .and_then(|_| w.flush())
196 .map_err(|e: std::io::Error| e.to_string())
197 }
198
199 pub fn session_stdout(&self, task_id: &str, timeout_ms: u64) -> Result<String, String> {
201 let tasks = safe_lock(&self.tasks, "BackgroundManager::session_stdout");
202 let task = tasks
203 .get(task_id)
204 .ok_or_else(|| "session not found or process already exited".to_string())?;
205 if task.pty_writer.is_none() {
206 return Err("not an interactive session".to_string());
207 }
208 let buf = safe_lock(&task.output_buffer, "session_stdout::buf");
210 let start_len = buf.len();
211 drop(buf);
212 drop(tasks);
213
214 let start = Instant::now();
216 let timeout = Duration::from_millis(timeout_ms);
217 while start.elapsed() < timeout {
218 let tasks = safe_lock(&self.tasks, "session_stdout::poll");
219 if let Some(task) = tasks.get(task_id) {
220 let buf = safe_lock(&task.output_buffer, "session_stdout::poll_buf");
221 if buf.len() > start_len {
222 return Ok(buf[start_len..].to_string());
224 }
225 if task.status != "running" {
227 let output = buf[start_len..].to_string();
228 return Ok(output);
229 }
230 } else {
231 return Err("session not found".to_string());
232 }
233 std::thread::sleep(Duration::from_millis(50));
234 }
235 let tasks = safe_lock(&self.tasks, "session_stdout::timeout");
237 if let Some(task) = tasks.get(task_id) {
238 let buf = safe_lock(&task.output_buffer, "session_stdout::timeout_buf");
239 return Ok(buf[start_len..].to_string());
240 }
241 Ok(String::new())
242 }
243
244 pub fn session_quit(&self, task_id: &str) -> Result<(), String> {
246 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::session_quit");
247 if let Some(task) = tasks.get_mut(task_id) {
248 task.pty_writer = None; task.status = "completed".to_string();
250 task.result = Some("session quit by user".to_string());
251 } else {
252 return Err("session not found".to_string());
253 }
254 Ok(())
255 }
256
257 pub fn complete_task(&self, task_id: &str, status: &str, result: String) {
259 let command;
260 {
261 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::complete_task");
262 if let Some(task) = tasks.get_mut(task_id) {
263 task.status = status.to_string();
264 task.result = Some(result.clone());
265 command = task.command.clone();
266 } else {
267 return;
268 }
269 }
270 {
271 let mut notifs = safe_lock(&self.notifications, "BackgroundManager::complete_notify");
272 notifs.push(BgNotification {
273 task_id: task_id.to_string(),
274 command,
275 status: status.to_string(),
276 result,
277 });
278 }
279 }
280
281 pub fn drain_notifications(&self) -> Vec<BgNotification> {
283 let mut notifs = safe_lock(
284 &self.notifications,
285 "BackgroundManager::drain_notifications",
286 );
287 std::mem::take(&mut *notifs)
288 }
289
290 pub fn get_task_status(&self, task_id: &str) -> Option<Value> {
292 let tasks = safe_lock(&self.tasks, "BackgroundManager::get_task_status");
293 tasks.get(task_id).map(|t| {
294 let output = {
296 let buf = safe_lock(&t.output_buffer, "BgTask::output_buffer");
297 if buf.is_empty() {
298 t.result.clone()
299 } else {
300 Some(buf.clone())
301 }
302 };
303 json!({
304 "task_id": t.task_id,
305 "command": t.command,
306 "status": t.status,
307 "output": output,
308 })
309 })
310 }
311
312 pub fn is_running(&self, task_id: &str) -> bool {
314 let tasks = safe_lock(&self.tasks, "BackgroundManager::is_running");
315 tasks
316 .get(task_id)
317 .map(|t| t.status == "running")
318 .unwrap_or(false)
319 }
320
321 pub fn list_running(&self) -> Vec<(String, String, u64, bool)> {
327 let tasks = safe_lock(&self.tasks, "BackgroundManager::list_running");
328 let now = Instant::now();
329 let mut out: Vec<_> = tasks
330 .values()
331 .filter(|t| t.status == "running")
332 .map(|t| {
333 let elapsed = now.duration_since(t.started_at).as_secs();
334 let cmd_summary = if t.command.chars().count() > 80 {
336 let truncated: String = t
337 .command
338 .chars()
339 .take(BG_TASK_CMD_DISPLAY_MAX_CHARS)
340 .collect();
341 format!("{}...", truncated)
342 } else {
343 t.command.clone()
344 };
345 let is_interactive = t.pty_writer.is_some();
346 (t.task_id.clone(), cmd_summary, elapsed, is_interactive)
347 })
348 .collect();
349 out.sort_by(|a, b| a.0.cmp(&b.0));
350 out
351 }
352
353 pub fn cleanup_dead_tasks(&self) {
356 let mut tasks = safe_lock(&self.tasks, "BackgroundManager::cleanup_dead_tasks");
357 let running_count = tasks.values().filter(|t| t.status == "running").count();
358
359 if running_count == 0 {
360 return;
361 }
362
363 crate::util::log::write_info_log(
364 "BgTask::cleanup_dead_tasks",
365 &format!("开始存活检测,共 {} 个 running 任务", running_count),
366 );
367
368 let mut dead_tasks = Vec::new();
369
370 for task in tasks.values() {
371 if task.status != "running" {
372 continue;
373 }
374
375 let confirmed_alive = if let Some(pid) = task.child_pid {
376 let alive = process_exists(pid);
377 if !alive {
378 crate::util::log::write_info_log(
379 "BgTask::cleanup_dead_tasks",
380 &format!("任务 {} (PID: {}) 进程不存在", task.task_id, pid),
381 );
382 }
383 alive
384 } else if let Some(ref is_running) = task.is_thread_running {
385 let alive = is_running.load(Ordering::Relaxed);
387 if !alive {
388 crate::util::log::write_info_log(
389 "BgTask::cleanup_dead_tasks",
390 &format!(
391 "任务 {} 线程标记已置为 false (cmd: {})",
392 task.task_id, task.command
393 ),
394 );
395 }
396 alive
397 } else {
398 crate::util::log::write_info_log(
400 "BgTask::cleanup_dead_tasks",
401 &format!(
402 "任务 {} 无 PID 且无线程标记,使用 command 匹配检测 (cmd: {})",
403 task.task_id, task.command
404 ),
405 );
406 is_process_alive_by_command(&task.command)
407 };
408
409 if !confirmed_alive {
410 dead_tasks.push((task.task_id.clone(), task.command.clone(), task.child_pid));
411 }
412 }
413
414 for (task_id, command, pid) in &dead_tasks {
416 if let Some(task) = tasks.get_mut(task_id) {
417 task.status = "dead".to_string();
418 let pid_info = pid.map_or(String::new(), |p| format!(" (PID: {})", p));
419 task.result = Some(format!(
420 "进程已终止{}:被外部杀死、崩溃或 PID 被复用",
421 pid_info
422 ));
423 }
424 let pid_info = pid.map_or(String::new(), |p| format!("PID: {}", p));
425 crate::util::log::write_info_log(
426 "BgTask::cleanup_dead_tasks",
427 &format!(
428 "任务 {} ({} cmd: {}) 已确认为 dead",
429 task_id, pid_info, command
430 ),
431 );
432 }
433
434 crate::util::log::write_info_log(
435 "BgTask::cleanup_dead_tasks",
436 &format!("存活检测完成,发现 {} 个 dead 任务", dead_tasks.len()),
437 );
438
439 if !dead_tasks.is_empty() {
441 let mut notifs = safe_lock(
442 &self.notifications,
443 "BackgroundManager::cleanup_dead_tasks_notify",
444 );
445 for (task_id, command, pid) in dead_tasks {
446 let pid_info = pid.map_or(String::new(), |p| format!(" (PID: {})", p));
447 notifs.push(BgNotification {
448 task_id,
449 command,
450 status: "dead".to_string(),
451 result: format!("进程已终止{}:被外部杀死、崩溃或 PID 被复用", pid_info),
452 });
453 }
454 }
455 }
456}
457
458#[cfg(unix)]
463fn process_exists(pid: u32) -> bool {
464 use nix::errno::Errno;
465 use nix::sys::signal::kill;
466 use nix::unistd::Pid;
467
468 match kill(Pid::from_raw(pid as i32), None) {
469 Ok(_) => true,
470 Err(Errno::ESRCH) => false, Err(_) => true, }
473}
474
475#[cfg(not(unix))]
476fn process_exists(_pid: u32) -> bool {
477 true
478}
479
480#[cfg(unix)]
482fn is_process_alive_by_command(command: &str) -> bool {
483 use std::process::Command;
484
485 let cmd_name = command.split_whitespace().next().unwrap_or(command);
486 let output = Command::new("pgrep").arg("-x").arg(cmd_name).output();
487 match output {
488 Ok(o) => o.status.success(),
489 Err(_) => true, }
491}
492
493#[cfg(not(unix))]
494fn is_process_alive_by_command(_command: &str) -> bool {
495 true
496}
497
498pub fn build_running_summary(manager: &Arc<BackgroundManager>) -> String {
500 let running = manager.list_running();
501 if running.is_empty() {
502 return String::new();
503 }
504 let (sessions, bg_jobs): (Vec<_>, Vec<_>) = running
505 .into_iter()
506 .partition(|(_, _, _, interactive)| *interactive);
507
508 let mut out = String::new();
509
510 if !bg_jobs.is_empty() {
511 out.push_str(
512 "## Background Tasks\n\n\
513 The following background tasks are still running. \
514 Use TaskOutput to wait for or check their results when needed. \
515 Do not re-spawn these commands.\n",
516 );
517 for (id, cmd, elapsed, _) in &bg_jobs {
518 out.push_str(&format!(
519 "- {} (running {}): {}\n",
520 id,
521 format_elapsed(*elapsed),
522 cmd
523 ));
524 }
525 }
526
527 if !sessions.is_empty() {
528 if !out.is_empty() {
529 out.push('\n');
530 }
531 out.push_str(
532 "## Interactive Sessions\n\n\
533 The following interactive PTY sessions are alive (started via Bash/Powershell with interactive: true). \
534 Use the Session tool (action=stdin/stdout/quit) with the sid below to interact with them. \
535 Do NOT use TaskOutput and do NOT re-spawn these processes — they are still running and waiting for input.\n",
536 );
537 for (id, cmd, elapsed, _) in &sessions {
538 out.push_str(&format!(
539 "- sid={} (alive {}): {}\n",
540 id,
541 format_elapsed(*elapsed),
542 cmd
543 ));
544 }
545 }
546
547 out.trim_end().to_string()
548}
549
550fn format_elapsed(secs: u64) -> String {
551 if secs < 60 {
552 format!("{}s", secs)
553 } else if secs < 3600 {
554 format!("{}m{}s", secs / 60, secs % 60)
555 } else {
556 format!("{}h{}m", secs / 3600, (secs % 3600) / 60)
557 }
558}
559
560#[derive(Deserialize, JsonSchema)]
564struct TaskOutputParams {
565 task_id: String,
567 #[serde(default = "default_block")]
569 block: bool,
570 #[serde(default = "default_timeout_ms")]
572 timeout: u64,
573}
574
575fn default_block() -> bool {
576 true
577}
578
579fn default_timeout_ms() -> u64 {
580 BG_TASK_DEFAULT_TIMEOUT_MS
581}
582
583#[derive(Debug)]
585pub struct TaskOutputTool {
586 pub manager: Arc<BackgroundManager>,
587}
588
589impl TaskOutputTool {
590 pub const NAME: &'static str = "TaskOutput";
591}
592
593impl Tool for TaskOutputTool {
594 fn name(&self) -> &str {
595 Self::NAME
596 }
597
598 fn description(&self) -> Cow<'_, str> {
599 r#"
600 Retrieves output from a running or completed background task (started via Bash with run_in_background: true).
601 Use block=true (default) to wait for task completion; use block=false for a non-blocking status check.
602 Returns the task output along with status information.
603 "#.into()
604 }
605
606 fn parameters_schema(&self) -> Value {
607 schema_to_tool_params::<TaskOutputParams>()
608 }
609
610 fn execute(&self, arguments: &str, cancelled: &Arc<AtomicBool>) -> ToolResult {
611 let params: TaskOutputParams = match parse_tool_args(arguments) {
612 Ok(p) => p,
613 Err(e) => return e,
614 };
615
616 let timeout_ms = params.timeout.min(BG_TASK_MAX_TIMEOUT_MS);
617
618 if self.manager.get_task_status(¶ms.task_id).is_none() {
620 return ToolResult {
621 output: format!("后台任务 {} 不存在", params.task_id),
622 is_error: true,
623 images: vec![],
624 plan_decision: PlanDecision::None,
625 };
626 }
627
628 if params.block && self.manager.is_running(¶ms.task_id) {
630 let start = Instant::now();
631 let timeout = Duration::from_millis(timeout_ms);
632 loop {
633 if !self.manager.is_running(¶ms.task_id) {
634 break;
635 }
636 if cancelled.load(std::sync::atomic::Ordering::Relaxed) {
638 let info = self
639 .manager
640 .get_task_status(¶ms.task_id)
641 .unwrap_or(json!({}));
642 let mut obj = info.clone();
643 if let Some(map) = obj.as_object_mut() {
644 map.insert(
645 "note".to_string(),
646 json!("cancelled: request was cancelled while waiting for task output"),
647 );
648 }
649 return ToolResult {
650 output: serde_json::to_string_pretty(&obj).unwrap_or_default(),
651 is_error: false,
652 images: vec![],
653 plan_decision: PlanDecision::None,
654 };
655 }
656 if start.elapsed() >= timeout {
657 let info = self
659 .manager
660 .get_task_status(¶ms.task_id)
661 .unwrap_or(json!({}));
662 let mut obj = info.clone();
663 if let Some(map) = obj.as_object_mut() {
664 map.insert(
665 "note".to_string(),
666 json!("still running: timeout waiting for completion"),
667 );
668 }
669 return ToolResult {
670 output: serde_json::to_string_pretty(&obj).unwrap_or_default(),
671 is_error: false,
672 images: vec![],
673 plan_decision: PlanDecision::None,
674 };
675 }
676 std::thread::sleep(Duration::from_millis(50));
677 }
678 }
679
680 match self.manager.get_task_status(¶ms.task_id) {
682 Some(info) => ToolResult {
683 output: serde_json::to_string_pretty(&info).unwrap_or_default(),
684 is_error: false,
685 images: vec![],
686 plan_decision: PlanDecision::None,
687 },
688 None => ToolResult {
689 output: format!("后台任务 {} 不存在", params.task_id),
690 is_error: true,
691 images: vec![],
692 plan_decision: PlanDecision::None,
693 },
694 }
695 }
696}