1use std::collections::BTreeMap;
36use std::path::PathBuf;
37use std::process::{Child, Stdio};
38use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
39use std::sync::{Arc, LazyLock, Mutex, OnceLock};
40use std::time::Duration;
41
42use harn_vm::VmValue;
43
44use harn_vm::process_sandbox;
45
46use crate::error::HostlibError;
47use crate::tools::proc::{self, CaptureConfig, CommandStatus, EnvMode};
48
49static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
51
52struct CancelState {
54 cancelled: AtomicBool,
57}
58
59struct HandleEntry {
61 child: Option<Child>,
63 pid: u32,
65 session_id: String,
66 cancel_state: Arc<CancelState>,
68}
69
70#[derive(Default)]
71struct HandleStore {
72 entries: BTreeMap<String, HandleEntry>,
73}
74
75static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
76 LazyLock::new(|| Mutex::new(HandleStore::default()));
77
78pub struct LongRunningHandleInfo {
81 pub command_id: String,
83 pub handle_id: String,
85 pub started_at: String,
87 pub pid: u32,
89 pub process_group_id: Option<u32>,
91 pub command_display: String,
93}
94
95impl LongRunningHandleInfo {
96 pub fn into_handle_response(self) -> VmValue {
98 proc::running_response(
99 self.command_id,
100 self.handle_id,
101 self.pid,
102 self.process_group_id,
103 self.started_at,
104 self.command_display,
105 )
106 }
107}
108
109pub fn spawn_long_running(
114 builtin: &'static str,
115 program: String,
116 args: Vec<String>,
117 cwd: Option<PathBuf>,
118 env: BTreeMap<String, String>,
119 session_id: String,
120) -> Result<LongRunningHandleInfo, HostlibError> {
121 spawn_long_running_with_options(
122 builtin,
123 program,
124 args,
125 cwd,
126 env,
127 EnvMode::InheritClean,
128 CaptureConfig::default(),
129 session_id,
130 )
131}
132
133pub(crate) fn spawn_long_running_with_options(
134 builtin: &'static str,
135 program: String,
136 args: Vec<String>,
137 cwd: Option<PathBuf>,
138 env: BTreeMap<String, String>,
139 env_mode: EnvMode,
140 capture: CaptureConfig,
141 session_id: String,
142) -> Result<LongRunningHandleInfo, HostlibError> {
143 if program.is_empty() {
144 return Err(HostlibError::InvalidParameter {
145 builtin,
146 param: "argv",
147 message: "first element of argv must be a non-empty program name".to_string(),
148 });
149 }
150
151 let mut command =
152 process_sandbox::std_command_for(&program, &args).map_err(|e| HostlibError::Backend {
153 builtin,
154 message: format!("sandbox setup failed: {e:?}"),
155 })?;
156
157 if let Some(cwd_path) = cwd.as_ref() {
158 process_sandbox::enforce_process_cwd(cwd_path).map_err(|e| HostlibError::Backend {
159 builtin,
160 message: format!("sandbox cwd rejected: {e:?}"),
161 })?;
162 command.current_dir(cwd_path);
163 }
164
165 proc::configure_background_process_group(&mut command);
166
167 if matches!(env_mode, EnvMode::Replace) {
168 command.env_clear();
169 }
170 if !env.is_empty() {
171 for (key, value) in &env {
172 command.env(key, value);
173 }
174 }
175
176 command.stdout(Stdio::piped());
177 command.stderr(Stdio::piped());
178 command.stdin(Stdio::null());
179
180 let child = command.spawn().map_err(|e| {
181 if let Some(violation) = process_sandbox::process_spawn_error(&e) {
182 return HostlibError::Backend {
183 builtin,
184 message: format!("sandbox rejected spawn: {violation:?}"),
185 };
186 }
187 HostlibError::Backend {
188 builtin,
189 message: format!("spawn failed: {e}"),
190 }
191 })?;
192
193 let pid = child.id();
194 let process_group_id = proc::child_process_group_id(pid);
195 let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
196 let handle_id = format!("hto-{:x}-{id}", std::process::id());
197 let command_id = proc::next_command_id();
198 let started_at = proc::now_rfc3339();
199
200 let mut all_argv = vec![program.clone()];
201 all_argv.extend(args.iter().cloned());
202 let command_display = all_argv.join(" ");
203
204 let cancel_state = Arc::new(CancelState {
205 cancelled: AtomicBool::new(false),
206 });
207
208 {
209 let mut store = HANDLE_STORE
210 .lock()
211 .expect("long-running handle store poisoned");
212 store.entries.insert(
213 handle_id.clone(),
214 HandleEntry {
215 child: Some(child),
216 pid,
217 session_id: session_id.clone(),
218 cancel_state: cancel_state.clone(),
219 },
220 );
221 }
222
223 let waiter_command_id = command_id.clone();
224 let waiter_handle_id = handle_id.clone();
225 let waiter_session_id = session_id;
226 let waiter_started_at = started_at.clone();
227 let waiter_command_display = command_display.clone();
228 std::thread::Builder::new()
229 .name(format!("hto-waiter-{waiter_handle_id}"))
230 .spawn(move || {
231 waiter_thread(
232 waiter_command_id,
233 waiter_handle_id,
234 waiter_session_id,
235 cancel_state,
236 capture,
237 waiter_started_at,
238 process_group_id,
239 waiter_command_display,
240 );
241 })
242 .map_err(|e| HostlibError::Backend {
243 builtin,
244 message: format!("failed to spawn waiter thread: {e}"),
245 })?;
246
247 Ok(LongRunningHandleInfo {
248 command_id,
249 handle_id,
250 started_at,
251 pid,
252 process_group_id,
253 command_display,
254 })
255}
256
257fn waiter_thread(
259 command_id: String,
260 handle_id: String,
261 session_id: String,
262 cancel_state: Arc<CancelState>,
263 capture: CaptureConfig,
264 started_at: String,
265 process_group_id: Option<u32>,
266 command_display: String,
267) {
268 let waiter_start = std::time::Instant::now();
269
270 let mut child = {
273 let mut store = HANDLE_STORE
274 .lock()
275 .expect("long-running handle store poisoned");
276 match store.entries.get_mut(&handle_id) {
277 Some(entry) => match entry.child.take() {
278 Some(c) => c,
279 None => return, },
281 None => return, }
283 };
284
285 use std::io::Read;
287 let mut stdout_bytes = Vec::new();
288 let mut stderr_bytes = Vec::new();
289 let (out_tx, out_rx) = std::sync::mpsc::channel::<Vec<u8>>();
290 let (err_tx, err_rx) = std::sync::mpsc::channel::<Vec<u8>>();
291
292 if let Some(mut out) = child.stdout.take() {
293 std::thread::spawn(move || {
294 let _ = out.read_to_end(&mut stdout_bytes);
295 let _ = out_tx.send(stdout_bytes);
296 });
297 }
298 if let Some(mut err) = child.stderr.take() {
299 std::thread::spawn(move || {
300 let _ = err.read_to_end(&mut stderr_bytes);
301 let _ = err_tx.send(stderr_bytes);
302 });
303 }
304
305 let status = child.wait().ok();
306
307 let stdout = out_rx
308 .recv_timeout(Duration::from_secs(5))
309 .unwrap_or_default();
310 let stderr = err_rx
311 .recv_timeout(Duration::from_secs(5))
312 .unwrap_or_default();
313
314 {
316 let mut store = HANDLE_STORE
317 .lock()
318 .expect("long-running handle store poisoned");
319 store.entries.remove(&handle_id);
320 }
321
322 if cancel_state.cancelled.load(Ordering::Acquire) {
325 return;
326 }
327
328 let (exit_code, signal_name) = match status {
329 Some(s) => decode_exit_status(s),
330 None => (-1, Some("SIGKILL".to_string())),
332 };
333 let duration = waiter_start.elapsed();
334 let duration_ms = duration.as_millis() as i64;
335 let artifacts = match proc::persist_artifacts(&command_id, &stdout, &stderr, Some(&handle_id)) {
336 Ok(artifacts) => artifacts,
337 Err(_) => return,
338 };
339 let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
340
341 let mut payload = serde_json::Map::new();
342 payload.insert(
343 "command_id".into(),
344 serde_json::Value::String(command_id.clone()),
345 );
346 payload.insert(
347 "status".into(),
348 serde_json::Value::String(CommandStatus::Completed.as_str().to_string()),
349 );
350 payload.insert("handle_id".into(), serde_json::Value::String(handle_id));
351 payload.insert(
352 "command_or_op_descriptor".into(),
353 serde_json::Value::String(command_display),
354 );
355 payload.insert("started_at".into(), serde_json::Value::String(started_at));
356 payload.insert(
357 "ended_at".into(),
358 serde_json::Value::String(proc::now_rfc3339()),
359 );
360 payload.insert(
361 "duration_ms".into(),
362 serde_json::Value::Number(duration_ms.into()),
363 );
364 payload.insert(
365 "exit_code".into(),
366 serde_json::Value::Number(exit_code.into()),
367 );
368 payload.insert("stdout".into(), serde_json::Value::String(inline_stdout));
369 payload.insert("stderr".into(), serde_json::Value::String(inline_stderr));
370 payload.insert(
371 "output_path".into(),
372 serde_json::Value::String(artifacts.output_path.display().to_string()),
373 );
374 payload.insert(
375 "stdout_path".into(),
376 serde_json::Value::String(artifacts.stdout_path.display().to_string()),
377 );
378 payload.insert(
379 "stderr_path".into(),
380 serde_json::Value::String(artifacts.stderr_path.display().to_string()),
381 );
382 payload.insert(
383 "line_count".into(),
384 serde_json::Value::Number(artifacts.line_count.into()),
385 );
386 payload.insert(
387 "byte_count".into(),
388 serde_json::Value::Number(artifacts.byte_count.into()),
389 );
390 payload.insert(
391 "output_sha256".into(),
392 serde_json::Value::String(artifacts.output_sha256),
393 );
394 if let Some(pgid) = process_group_id {
395 payload.insert(
396 "process_group_id".into(),
397 serde_json::Value::Number((pgid as u64).into()),
398 );
399 }
400 if let Some(sig) = signal_name {
401 payload.insert("signal".into(), serde_json::Value::String(sig));
402 } else {
403 payload.insert("signal".into(), serde_json::Value::Null);
404 }
405
406 let content = serde_json::to_string(&payload).unwrap_or_default();
407 harn_vm::push_pending_feedback_global(&session_id, "tool_result", &content);
408}
409
410pub fn cancel_handle(handle_id: &str) -> bool {
413 let (pid, child, cancel_state) = {
414 let mut store = HANDLE_STORE
415 .lock()
416 .expect("long-running handle store poisoned");
417 match store.entries.remove(handle_id) {
418 None => return false,
419 Some(mut entry) => (entry.pid, entry.child.take(), entry.cancel_state.clone()),
420 }
421 };
422 do_kill(pid, child, cancel_state);
423 true
424}
425
426pub fn cancel_session_handles(session_id: &str) {
429 let to_kill: Vec<(u32, Option<Child>, Arc<CancelState>)> = {
430 let mut store = HANDLE_STORE
431 .lock()
432 .expect("long-running handle store poisoned");
433 let matching: Vec<String> = store
434 .entries
435 .iter()
436 .filter(|(_, e)| e.session_id == session_id)
437 .map(|(id, _)| id.clone())
438 .collect();
439 matching
440 .into_iter()
441 .filter_map(|id| {
442 store.entries.remove(&id).map(|mut e| {
443 let child = e.child.take();
444 (e.pid, child, e.cancel_state.clone())
445 })
446 })
447 .collect()
448 };
449 for (pid, child, cancel_state) in to_kill {
450 do_kill(pid, child, cancel_state);
451 }
452}
453
454fn do_kill(pid: u32, child: Option<Child>, cancel_state: Arc<CancelState>) {
457 cancel_state.cancelled.store(true, Ordering::Release);
459 if let Some(mut c) = child {
460 kill_child(&mut c);
462 } else {
463 kill_pid_or_group(pid);
465 }
466}
467
468pub(crate) fn register_cleanup_hook() {
472 static REGISTERED: OnceLock<()> = OnceLock::new();
473 REGISTERED.get_or_init(|| {
474 let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
475 cancel_session_handles(session_id);
476 });
477 harn_vm::register_session_end_hook(hook);
478 });
479}
480
481fn kill_child(child: &mut Child) {
482 kill_pid_or_group(child.id());
483 let _ = child.kill();
484 let _ = child.wait();
485}
486
487fn kill_pid_or_group(pid: u32) {
490 #[cfg(unix)]
491 {
492 extern "C" {
495 fn kill(pid: i32, sig: i32) -> i32;
496 }
497 unsafe {
498 kill(-(pid as i32), 9); kill(pid as i32, 9);
500 }
501 }
502 #[cfg(not(unix))]
503 {
504 let _ = pid; }
506}
507
508fn decode_exit_status(status: std::process::ExitStatus) -> (i32, Option<String>) {
509 #[cfg(unix)]
510 {
511 use std::os::unix::process::ExitStatusExt;
512 if let Some(code) = status.code() {
513 return (code, None);
514 }
515 if let Some(sig) = status.signal() {
516 return (-1, Some(format!("SIG{sig}")));
517 }
518 (-1, None)
519 }
520 #[cfg(not(unix))]
521 (status.code().unwrap_or(-1), None)
522}