1use std::collections::BTreeMap;
32use std::path::PathBuf;
33use std::process::{Child, Stdio};
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35use std::sync::{Arc, LazyLock, Mutex, OnceLock};
36use std::time::Duration;
37
38use harn_vm::VmValue;
39
40use harn_vm::process_sandbox;
41
42use crate::error::HostlibError;
43use crate::tools::proc::{self, CaptureConfig, CommandStatus, EnvMode};
44
45static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
47
48struct CancelState {
50 cancelled: AtomicBool,
53}
54
55struct HandleEntry {
57 child: Option<Child>,
59 pid: u32,
61 session_id: String,
62 cancel_state: Arc<CancelState>,
64}
65
66#[derive(Default)]
67struct HandleStore {
68 entries: BTreeMap<String, HandleEntry>,
69}
70
71static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
72 LazyLock::new(|| Mutex::new(HandleStore::default()));
73
74pub struct LongRunningHandleInfo {
77 pub command_id: String,
79 pub handle_id: String,
81 pub started_at: String,
83 pub pid: u32,
85 pub process_group_id: Option<u32>,
87 pub command_display: String,
89}
90
91impl LongRunningHandleInfo {
92 pub fn into_handle_response(self) -> VmValue {
94 proc::running_response(
95 self.command_id,
96 self.handle_id,
97 self.pid,
98 self.process_group_id,
99 self.started_at,
100 self.command_display,
101 )
102 }
103}
104
105pub fn spawn_long_running(
110 builtin: &'static str,
111 program: String,
112 args: Vec<String>,
113 cwd: Option<PathBuf>,
114 env: BTreeMap<String, String>,
115 session_id: String,
116) -> Result<LongRunningHandleInfo, HostlibError> {
117 spawn_long_running_with_options(
118 builtin,
119 program,
120 args,
121 cwd,
122 env,
123 EnvMode::InheritClean,
124 CaptureConfig::default(),
125 session_id,
126 )
127}
128
129pub(crate) fn spawn_long_running_with_options(
130 builtin: &'static str,
131 program: String,
132 args: Vec<String>,
133 cwd: Option<PathBuf>,
134 env: BTreeMap<String, String>,
135 env_mode: EnvMode,
136 capture: CaptureConfig,
137 session_id: String,
138) -> Result<LongRunningHandleInfo, HostlibError> {
139 if program.is_empty() {
140 return Err(HostlibError::InvalidParameter {
141 builtin,
142 param: "argv",
143 message: "first element of argv must be a non-empty program name".to_string(),
144 });
145 }
146
147 let mut command =
148 process_sandbox::std_command_for(&program, &args).map_err(|e| HostlibError::Backend {
149 builtin,
150 message: format!("sandbox setup failed: {e:?}"),
151 })?;
152
153 if let Some(cwd_path) = cwd.as_ref() {
154 process_sandbox::enforce_process_cwd(cwd_path).map_err(|e| HostlibError::Backend {
155 builtin,
156 message: format!("sandbox cwd rejected: {e:?}"),
157 })?;
158 command.current_dir(cwd_path);
159 }
160
161 proc::configure_background_process_group(&mut command);
162
163 if matches!(env_mode, EnvMode::Replace) {
164 command.env_clear();
165 }
166 if !env.is_empty() {
167 for (key, value) in &env {
168 command.env(key, value);
169 }
170 }
171
172 command.stdout(Stdio::piped());
173 command.stderr(Stdio::piped());
174 command.stdin(Stdio::null());
175
176 let child = command.spawn().map_err(|e| {
177 if let Some(violation) = process_sandbox::process_spawn_error(&e) {
178 return HostlibError::Backend {
179 builtin,
180 message: format!("sandbox rejected spawn: {violation:?}"),
181 };
182 }
183 HostlibError::Backend {
184 builtin,
185 message: format!("spawn failed: {e}"),
186 }
187 })?;
188
189 let pid = child.id();
190 let process_group_id = proc::child_process_group_id(pid);
191 let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
192 let handle_id = format!("hto-{:x}-{id}", std::process::id());
193 let command_id = proc::next_command_id();
194 let started_at = proc::now_rfc3339();
195
196 let mut all_argv = vec![program.clone()];
197 all_argv.extend(args.iter().cloned());
198 let command_display = all_argv.join(" ");
199
200 let cancel_state = Arc::new(CancelState {
201 cancelled: AtomicBool::new(false),
202 });
203
204 {
205 let mut store = HANDLE_STORE
206 .lock()
207 .expect("long-running handle store poisoned");
208 store.entries.insert(
209 handle_id.clone(),
210 HandleEntry {
211 child: Some(child),
212 pid,
213 session_id: session_id.clone(),
214 cancel_state: cancel_state.clone(),
215 },
216 );
217 }
218
219 let waiter_command_id = command_id.clone();
220 let waiter_handle_id = handle_id.clone();
221 let waiter_session_id = session_id;
222 let waiter_started_at = started_at.clone();
223 std::thread::Builder::new()
224 .name(format!("hto-waiter-{waiter_handle_id}"))
225 .spawn(move || {
226 waiter_thread(
227 waiter_command_id,
228 waiter_handle_id,
229 waiter_session_id,
230 cancel_state,
231 capture,
232 waiter_started_at,
233 process_group_id,
234 );
235 })
236 .map_err(|e| HostlibError::Backend {
237 builtin,
238 message: format!("failed to spawn waiter thread: {e}"),
239 })?;
240
241 Ok(LongRunningHandleInfo {
242 command_id,
243 handle_id,
244 started_at,
245 pid,
246 process_group_id,
247 command_display,
248 })
249}
250
251fn waiter_thread(
253 command_id: String,
254 handle_id: String,
255 session_id: String,
256 cancel_state: Arc<CancelState>,
257 capture: CaptureConfig,
258 started_at: String,
259 process_group_id: Option<u32>,
260) {
261 let waiter_start = std::time::Instant::now();
262
263 let mut child = {
266 let mut store = HANDLE_STORE
267 .lock()
268 .expect("long-running handle store poisoned");
269 match store.entries.get_mut(&handle_id) {
270 Some(entry) => match entry.child.take() {
271 Some(c) => c,
272 None => return, },
274 None => return, }
276 };
277
278 use std::io::Read;
280 let mut stdout_bytes = Vec::new();
281 let mut stderr_bytes = Vec::new();
282 let (out_tx, out_rx) = std::sync::mpsc::channel::<Vec<u8>>();
283 let (err_tx, err_rx) = std::sync::mpsc::channel::<Vec<u8>>();
284
285 if let Some(mut out) = child.stdout.take() {
286 std::thread::spawn(move || {
287 let _ = out.read_to_end(&mut stdout_bytes);
288 let _ = out_tx.send(stdout_bytes);
289 });
290 }
291 if let Some(mut err) = child.stderr.take() {
292 std::thread::spawn(move || {
293 let _ = err.read_to_end(&mut stderr_bytes);
294 let _ = err_tx.send(stderr_bytes);
295 });
296 }
297
298 let status = child.wait().ok();
299
300 let stdout = out_rx
301 .recv_timeout(Duration::from_secs(5))
302 .unwrap_or_default();
303 let stderr = err_rx
304 .recv_timeout(Duration::from_secs(5))
305 .unwrap_or_default();
306
307 {
309 let mut store = HANDLE_STORE
310 .lock()
311 .expect("long-running handle store poisoned");
312 store.entries.remove(&handle_id);
313 }
314
315 if cancel_state.cancelled.load(Ordering::Acquire) {
318 return;
319 }
320
321 let (exit_code, signal_name) = match status {
322 Some(s) => decode_exit_status(s),
323 None => (-1, Some("SIGKILL".to_string())),
325 };
326 let duration = waiter_start.elapsed();
327 let duration_ms = duration.as_millis() as i64;
328 let artifacts = match proc::persist_artifacts(&command_id, &stdout, &stderr, Some(&handle_id)) {
329 Ok(artifacts) => artifacts,
330 Err(_) => return,
331 };
332 let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
333
334 let mut payload = serde_json::Map::new();
335 payload.insert(
336 "command_id".into(),
337 serde_json::Value::String(command_id.clone()),
338 );
339 payload.insert(
340 "status".into(),
341 serde_json::Value::String(CommandStatus::Completed.as_str().to_string()),
342 );
343 payload.insert("handle_id".into(), serde_json::Value::String(handle_id));
344 payload.insert("started_at".into(), serde_json::Value::String(started_at));
345 payload.insert(
346 "ended_at".into(),
347 serde_json::Value::String(proc::now_rfc3339()),
348 );
349 payload.insert(
350 "duration_ms".into(),
351 serde_json::Value::Number(duration_ms.into()),
352 );
353 payload.insert(
354 "exit_code".into(),
355 serde_json::Value::Number(exit_code.into()),
356 );
357 payload.insert("stdout".into(), serde_json::Value::String(inline_stdout));
358 payload.insert("stderr".into(), serde_json::Value::String(inline_stderr));
359 payload.insert(
360 "output_path".into(),
361 serde_json::Value::String(artifacts.output_path.display().to_string()),
362 );
363 payload.insert(
364 "stdout_path".into(),
365 serde_json::Value::String(artifacts.stdout_path.display().to_string()),
366 );
367 payload.insert(
368 "stderr_path".into(),
369 serde_json::Value::String(artifacts.stderr_path.display().to_string()),
370 );
371 payload.insert(
372 "line_count".into(),
373 serde_json::Value::Number(artifacts.line_count.into()),
374 );
375 payload.insert(
376 "byte_count".into(),
377 serde_json::Value::Number(artifacts.byte_count.into()),
378 );
379 payload.insert(
380 "output_sha256".into(),
381 serde_json::Value::String(artifacts.output_sha256),
382 );
383 if let Some(pgid) = process_group_id {
384 payload.insert(
385 "process_group_id".into(),
386 serde_json::Value::Number((pgid as u64).into()),
387 );
388 }
389 if let Some(sig) = signal_name {
390 payload.insert("signal".into(), serde_json::Value::String(sig));
391 } else {
392 payload.insert("signal".into(), serde_json::Value::Null);
393 }
394
395 let content = serde_json::to_string(&payload).unwrap_or_default();
396 harn_vm::push_pending_feedback_global(&session_id, "tool_result", &content);
397}
398
399pub fn cancel_handle(handle_id: &str) -> bool {
402 let (pid, child, cancel_state) = {
403 let mut store = HANDLE_STORE
404 .lock()
405 .expect("long-running handle store poisoned");
406 match store.entries.remove(handle_id) {
407 None => return false,
408 Some(mut entry) => (entry.pid, entry.child.take(), entry.cancel_state.clone()),
409 }
410 };
411 do_kill(pid, child, cancel_state);
412 true
413}
414
415pub fn cancel_session_handles(session_id: &str) {
418 let to_kill: Vec<(u32, Option<Child>, Arc<CancelState>)> = {
419 let mut store = HANDLE_STORE
420 .lock()
421 .expect("long-running handle store poisoned");
422 let matching: Vec<String> = store
423 .entries
424 .iter()
425 .filter(|(_, e)| e.session_id == session_id)
426 .map(|(id, _)| id.clone())
427 .collect();
428 matching
429 .into_iter()
430 .filter_map(|id| {
431 store.entries.remove(&id).map(|mut e| {
432 let child = e.child.take();
433 (e.pid, child, e.cancel_state.clone())
434 })
435 })
436 .collect()
437 };
438 for (pid, child, cancel_state) in to_kill {
439 do_kill(pid, child, cancel_state);
440 }
441}
442
443fn do_kill(pid: u32, child: Option<Child>, cancel_state: Arc<CancelState>) {
446 cancel_state.cancelled.store(true, Ordering::Release);
448 if let Some(mut c) = child {
449 kill_child(&mut c);
451 } else {
452 kill_pid_or_group(pid);
454 }
455}
456
457pub(crate) fn register_cleanup_hook() {
461 static REGISTERED: OnceLock<()> = OnceLock::new();
462 REGISTERED.get_or_init(|| {
463 let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
464 cancel_session_handles(session_id);
465 });
466 harn_vm::register_session_end_hook(hook);
467 });
468}
469
470fn kill_child(child: &mut Child) {
471 kill_pid_or_group(child.id());
472 let _ = child.kill();
473 let _ = child.wait();
474}
475
476fn kill_pid_or_group(pid: u32) {
479 #[cfg(unix)]
480 {
481 extern "C" {
484 fn kill(pid: i32, sig: i32) -> i32;
485 }
486 unsafe {
487 kill(-(pid as i32), 9); kill(pid as i32, 9);
489 }
490 }
491 #[cfg(not(unix))]
492 {
493 let _ = pid; }
495}
496
497fn decode_exit_status(status: std::process::ExitStatus) -> (i32, Option<String>) {
498 #[cfg(unix)]
499 {
500 use std::os::unix::process::ExitStatusExt;
501 if let Some(code) = status.code() {
502 return (code, None);
503 }
504 if let Some(sig) = status.signal() {
505 return (-1, Some(format!("SIG{sig}")));
506 }
507 (-1, None)
508 }
509 #[cfg(not(unix))]
510 (status.code().unwrap_or(-1), None)
511}