1use std::collections::BTreeMap;
32use std::path::PathBuf;
33use std::process::{Child, Stdio};
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35use std::sync::{Arc, LazyLock, Mutex, OnceLock};
36use std::time::{Duration, SystemTime, UNIX_EPOCH};
37
38use harn_vm::VmValue;
39
40use harn_vm::process_sandbox;
41
42use crate::error::HostlibError;
43
44static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
46
47struct CancelState {
49 cancelled: AtomicBool,
52}
53
54struct HandleEntry {
56 child: Option<Child>,
58 pid: u32,
60 session_id: String,
61 cancel_state: Arc<CancelState>,
63}
64
65#[derive(Default)]
66struct HandleStore {
67 entries: BTreeMap<String, HandleEntry>,
68}
69
70static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
71 LazyLock::new(|| Mutex::new(HandleStore::default()));
72
73pub struct LongRunningHandleInfo {
76 pub handle_id: String,
78 pub started_at_ms: u64,
80 pub command_display: String,
82}
83
84impl LongRunningHandleInfo {
85 pub fn into_handle_response(self) -> VmValue {
87 super::response::ResponseBuilder::new()
88 .str("handle_id", self.handle_id)
89 .int("started_at", self.started_at_ms as i64)
90 .str("command", self.command_display)
91 .build()
92 }
93}
94
95pub fn spawn_long_running(
100 builtin: &'static str,
101 program: String,
102 args: Vec<String>,
103 cwd: Option<PathBuf>,
104 env: BTreeMap<String, String>,
105 session_id: String,
106) -> Result<LongRunningHandleInfo, HostlibError> {
107 if program.is_empty() {
108 return Err(HostlibError::InvalidParameter {
109 builtin,
110 param: "argv",
111 message: "first element of argv must be a non-empty program name".to_string(),
112 });
113 }
114
115 let mut command =
116 process_sandbox::std_command_for(&program, &args).map_err(|e| HostlibError::Backend {
117 builtin,
118 message: format!("sandbox setup failed: {e:?}"),
119 })?;
120
121 if let Some(cwd_path) = cwd.as_ref() {
122 process_sandbox::enforce_process_cwd(cwd_path).map_err(|e| HostlibError::Backend {
123 builtin,
124 message: format!("sandbox cwd rejected: {e:?}"),
125 })?;
126 command.current_dir(cwd_path);
127 }
128
129 if !env.is_empty() {
130 command.env_clear();
131 for (key, value) in &env {
132 command.env(key, value);
133 }
134 }
135
136 command.stdout(Stdio::piped());
137 command.stderr(Stdio::piped());
138 command.stdin(Stdio::null());
139
140 let child = command.spawn().map_err(|e| {
141 if let Some(violation) = process_sandbox::process_spawn_error(&e) {
142 return HostlibError::Backend {
143 builtin,
144 message: format!("sandbox rejected spawn: {violation:?}"),
145 };
146 }
147 HostlibError::Backend {
148 builtin,
149 message: format!("spawn failed: {e}"),
150 }
151 })?;
152
153 let pid = child.id();
154 let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
155 let handle_id = format!("hto-{:x}-{id}", std::process::id());
156
157 let started_at_ms = SystemTime::now()
158 .duration_since(UNIX_EPOCH)
159 .map(|d| d.as_millis() as u64)
160 .unwrap_or(0);
161
162 let mut all_argv = vec![program.clone()];
163 all_argv.extend(args.iter().cloned());
164 let command_display = all_argv.join(" ");
165
166 let cancel_state = Arc::new(CancelState {
167 cancelled: AtomicBool::new(false),
168 });
169
170 {
171 let mut store = HANDLE_STORE
172 .lock()
173 .expect("long-running handle store poisoned");
174 store.entries.insert(
175 handle_id.clone(),
176 HandleEntry {
177 child: Some(child),
178 pid,
179 session_id: session_id.clone(),
180 cancel_state: cancel_state.clone(),
181 },
182 );
183 }
184
185 let waiter_handle_id = handle_id.clone();
186 let waiter_session_id = session_id;
187 std::thread::Builder::new()
188 .name(format!("hto-waiter-{waiter_handle_id}"))
189 .spawn(move || {
190 waiter_thread(waiter_handle_id, waiter_session_id, cancel_state);
191 })
192 .map_err(|e| HostlibError::Backend {
193 builtin,
194 message: format!("failed to spawn waiter thread: {e}"),
195 })?;
196
197 Ok(LongRunningHandleInfo {
198 handle_id,
199 started_at_ms,
200 command_display,
201 })
202}
203
204fn waiter_thread(handle_id: String, session_id: String, cancel_state: Arc<CancelState>) {
206 let waiter_start = std::time::Instant::now();
207
208 let mut child = {
211 let mut store = HANDLE_STORE
212 .lock()
213 .expect("long-running handle store poisoned");
214 match store.entries.get_mut(&handle_id) {
215 Some(entry) => match entry.child.take() {
216 Some(c) => c,
217 None => return, },
219 None => return, }
221 };
222
223 use std::io::Read;
225 let mut stdout_bytes = Vec::new();
226 let mut stderr_bytes = Vec::new();
227 let (out_tx, out_rx) = std::sync::mpsc::channel::<Vec<u8>>();
228 let (err_tx, err_rx) = std::sync::mpsc::channel::<Vec<u8>>();
229
230 if let Some(mut out) = child.stdout.take() {
231 std::thread::spawn(move || {
232 let _ = out.read_to_end(&mut stdout_bytes);
233 let _ = out_tx.send(stdout_bytes);
234 });
235 }
236 if let Some(mut err) = child.stderr.take() {
237 std::thread::spawn(move || {
238 let _ = err.read_to_end(&mut stderr_bytes);
239 let _ = err_tx.send(stderr_bytes);
240 });
241 }
242
243 let status = child.wait().ok();
244
245 let stdout = out_rx
246 .recv_timeout(Duration::from_secs(5))
247 .unwrap_or_default();
248 let stderr = err_rx
249 .recv_timeout(Duration::from_secs(5))
250 .unwrap_or_default();
251
252 {
254 let mut store = HANDLE_STORE
255 .lock()
256 .expect("long-running handle store poisoned");
257 store.entries.remove(&handle_id);
258 }
259
260 if cancel_state.cancelled.load(Ordering::Acquire) {
263 return;
264 }
265
266 let (exit_code, signal_name) = match status {
267 Some(s) => decode_exit_status(s),
268 None => (-1, Some("SIGKILL".to_string())),
270 };
271 let duration_ms = waiter_start.elapsed().as_millis() as i64;
272
273 let mut payload = serde_json::Map::new();
274 payload.insert("handle_id".into(), serde_json::Value::String(handle_id));
275 payload.insert(
276 "exit_code".into(),
277 serde_json::Value::Number(exit_code.into()),
278 );
279 payload.insert(
280 "stdout".into(),
281 serde_json::Value::String(String::from_utf8_lossy(&stdout).into_owned()),
282 );
283 payload.insert(
284 "stderr".into(),
285 serde_json::Value::String(String::from_utf8_lossy(&stderr).into_owned()),
286 );
287 payload.insert(
288 "duration_ms".into(),
289 serde_json::Value::Number(duration_ms.into()),
290 );
291 if let Some(sig) = signal_name {
292 payload.insert("signal".into(), serde_json::Value::String(sig));
293 } else {
294 payload.insert("signal".into(), serde_json::Value::Null);
295 }
296
297 let content = serde_json::to_string(&payload).unwrap_or_default();
298 harn_vm::push_pending_feedback_global(&session_id, "tool_result", &content);
299}
300
301pub fn cancel_handle(handle_id: &str) -> bool {
304 let (pid, child, cancel_state) = {
305 let mut store = HANDLE_STORE
306 .lock()
307 .expect("long-running handle store poisoned");
308 match store.entries.remove(handle_id) {
309 None => return false,
310 Some(mut entry) => (entry.pid, entry.child.take(), entry.cancel_state.clone()),
311 }
312 };
313 do_kill(pid, child, cancel_state);
314 true
315}
316
317pub fn cancel_session_handles(session_id: &str) {
320 let to_kill: Vec<(u32, Option<Child>, Arc<CancelState>)> = {
321 let mut store = HANDLE_STORE
322 .lock()
323 .expect("long-running handle store poisoned");
324 let matching: Vec<String> = store
325 .entries
326 .iter()
327 .filter(|(_, e)| e.session_id == session_id)
328 .map(|(id, _)| id.clone())
329 .collect();
330 matching
331 .into_iter()
332 .filter_map(|id| {
333 store.entries.remove(&id).map(|mut e| {
334 let child = e.child.take();
335 (e.pid, child, e.cancel_state.clone())
336 })
337 })
338 .collect()
339 };
340 for (pid, child, cancel_state) in to_kill {
341 do_kill(pid, child, cancel_state);
342 }
343}
344
345fn do_kill(pid: u32, child: Option<Child>, cancel_state: Arc<CancelState>) {
348 cancel_state.cancelled.store(true, Ordering::Release);
350 if let Some(mut c) = child {
351 kill_child(&mut c);
353 } else {
354 kill_pid(pid);
356 }
357}
358
359pub(crate) fn register_cleanup_hook() {
363 static REGISTERED: OnceLock<()> = OnceLock::new();
364 REGISTERED.get_or_init(|| {
365 let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
366 cancel_session_handles(session_id);
367 });
368 harn_vm::register_session_end_hook(hook);
369 });
370}
371
372fn kill_child(child: &mut Child) {
373 let _ = child.kill();
374 let _ = child.wait();
375}
376
377fn kill_pid(pid: u32) {
380 #[cfg(unix)]
381 {
382 extern "C" {
385 fn kill(pid: i32, sig: i32) -> i32;
386 }
387 unsafe {
388 kill(pid as i32, 9); }
390 }
391 #[cfg(not(unix))]
392 {
393 let _ = pid; }
395}
396
397fn decode_exit_status(status: std::process::ExitStatus) -> (i32, Option<String>) {
398 #[cfg(unix)]
399 {
400 use std::os::unix::process::ExitStatusExt;
401 if let Some(code) = status.code() {
402 return (code, None);
403 }
404 if let Some(sig) = status.signal() {
405 return (-1, Some(format!("SIG{sig}")));
406 }
407 (-1, None)
408 }
409 #[cfg(not(unix))]
410 (status.code().unwrap_or(-1), None)
411}