ai_agent/tasks/
kill_shell_tasks.rs1#![allow(dead_code)]
4
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use crate::tasks::guards::{LocalShellTaskState, is_local_shell_task_from_value};
9use crate::types::ids::AgentId;
10
11type SetAppStateFn = Box<dyn Fn(Box<dyn Fn(&serde_json::Value) -> serde_json::Value>)>;
13
14type GetAppStateFn = Box<dyn Fn() -> serde_json::Value>;
16
17pub fn kill_task(task_id: &str, set_app_state: &SetAppStateFn) {
19 let task_id_owned = task_id.to_string();
20 set_app_state(Box::new(move |prev: &serde_json::Value| {
21 let mut prev = prev.clone();
22
23 let tasks = prev.get("tasks").and_then(|t| t.as_object());
24 if tasks.is_none() {
25 return prev;
26 }
27
28 let tasks = tasks.unwrap();
29 let task = tasks.get(task_id_owned.as_str());
30 if task.is_none() {
31 return prev;
32 }
33
34 let task = task.unwrap();
35
36 if !is_local_shell_task_from_value(task) {
38 return prev;
39 }
40
41 let status = task.get("status").and_then(|s| s.as_str()).unwrap_or("");
42 if status != "running" {
43 return prev;
44 }
45
46 log_for_debugging(&format!("LocalShellTask {} kill requested", task_id_owned));
48
49 if let Some(shell_cmd) = task.get("shellCommand") {
52 if !shell_cmd.is_null() {
53 }
56 }
57
58 let now = std::time::SystemTime::now()
63 .duration_since(std::time::UNIX_EPOCH)
64 .map(|d| d.as_millis() as u64)
65 .unwrap_or(0);
66
67 let mut updated_task = task.clone();
68 if let Some(obj) = updated_task.as_object_mut() {
69 obj.insert("status".to_string(), serde_json::json!("killed"));
70 obj.insert("notified".to_string(), serde_json::json!(true));
71 obj.insert("shellCommand".to_string(), serde_json::json!(null));
72 obj.insert("endTime".to_string(), serde_json::json!(now));
73 }
74
75 let mut new_tasks = tasks.clone();
76 new_tasks.insert(task_id_owned.clone(), updated_task);
77
78 if let Some(obj) = prev.as_object_mut() {
79 obj.insert("tasks".to_string(), serde_json::json!(new_tasks));
80 }
81
82 prev
83 }));
84
85 let task_id = task_id.to_string();
87 tokio::spawn(async move {
88 let _ = evict_task_output(&task_id).await;
89 });
90}
91
92pub fn kill_shell_tasks_for_agent(
96 agent_id: &AgentId,
97 get_app_state: &GetAppStateFn,
98 set_app_state: &SetAppStateFn,
99) {
100 let app_state = get_app_state();
101 let tasks = app_state.get("tasks").and_then(|t| t.as_object());
102
103 if let Some(tasks) = tasks {
104 for (task_id, task) in tasks {
105 if is_local_shell_task_from_value(task) {
106 let task_agent_id = task
107 .get("agentId")
108 .and_then(|v| v.as_str())
109 .map(|s| s.to_string());
110
111 let status = task.get("status").and_then(|s| s.as_str()).unwrap_or("");
112
113 if task_agent_id.as_deref() == Some(agent_id.as_str()) && status == "running" {
114 log_for_debugging(&format!(
115 "kill_shell_tasks_for_agent: killing orphaned shell task {} (agent {} exiting)",
116 task_id, agent_id
117 ));
118 kill_task(task_id, set_app_state);
119 }
120 }
121 }
122 }
123
124 dequeue_all_matching(|cmd: &serde_json::Value| {
129 cmd.get("agentId")
130 .and_then(|v| v.as_str())
131 .map(|s| s == agent_id.as_str())
132 .unwrap_or(false)
133 });
134}
135
136fn log_for_debugging(msg: &str) {
138 eprintln!("[DEBUG] {}", msg);
141}
142
143async fn evict_task_output(_task_id: &str) -> std::io::Result<()> {
145 Ok(())
147}
148
149fn dequeue_all_matching<F>(_filter: F)
151where
152 F: Fn(&serde_json::Value) -> bool,
153{
154 }