Skip to main content

harn_hostlib/tools/
long_running.rs

1//! Long-running tool handle machinery.
2//!
3//! When a caller passes `long_running: true` to `run_command`, `run_test`, or
4//! `run_build_command`, the builtin spawns the child process without waiting,
5//! registers it here, and returns a handle dict immediately:
6//!
7//! ```json
8//! {
9//!   "handle_id": "hto-<pid-hex>-<n>",
10//!   "started_at": "...",
11//!   "command_or_op_descriptor": "..."
12//! }
13//! ```
14//!
15//! A background thread waits for the child and, when it exits, pushes a
16//! `tool_result` entry into the active session's `agent_inbox` via
17//! `harn_vm::orchestration::agent_inbox::push(...)` so the agent-loop's
18//! next turn-preflight (or post-compaction drain) picks it up.
19//!
20//! ### Cancellation
21//!
22//! `cancel_handle(handle_id)` kills the spawned process (SIGKILL) within
23//! 2 seconds. The session-end hook registered on startup kills every
24//! in-flight handle associated with the ending session.
25//!
26//! #### PID-based signaling
27//!
28//! The waiter thread takes ownership of the `Child` object to drain
29//! stdout/stderr and call `wait()`. To keep cancellation possible even
30//! after the waiter has taken the `Child`, we store the raw OS process ID
31//! in the entry and kill by PID when needed. On Unix we call `kill(2)`
32//! directly via an `extern "C"` declaration (no `libc` crate required).
33//! A shared `cancelled` flag suppresses the feedback push when the waiter
34//! sees an exit caused by cancellation. Callers that need artifact-stable
35//! cancellation can opt into waiting for the waiter result through
36//! `cancel_handle`.
37
38use std::collections::BTreeMap;
39use std::io::{Read, Write};
40use std::path::PathBuf;
41use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
42use std::sync::{Arc, LazyLock, Mutex, OnceLock};
43use std::time::Duration;
44
45use harn_vm::VmValue;
46
47use crate::error::HostlibError;
48use crate::process::{self as process_handle, ProcessHandle, ProcessKiller, SpawnSpec};
49use crate::tools::proc::{self, CaptureConfig, CommandStatus, EnvMode};
50
51/// Atomic counter for generating unique handle IDs within this process.
52static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
53
54/// Shared cancellation state between the store entry and its waiter thread.
55struct CancelState {
56    /// Set to `true` when `cancel_handle` / `cancel_session_handles` runs.
57    /// The waiter checks this before pushing feedback.
58    cancelled: AtomicBool,
59    /// Set by cancellation paths that represent a timeout rather than a
60    /// user-requested kill. The waiter uses this for the returned result
61    /// status while still suppressing inbox feedback.
62    timed_out: AtomicBool,
63}
64
65#[derive(Default)]
66struct OutputState {
67    stdout: Vec<u8>,
68    stderr: Vec<u8>,
69}
70
71/// Shared state for a single in-flight child process.
72struct HandleEntry {
73    /// The process handle. `None` after the waiter thread takes ownership.
74    handle: Option<Box<dyn ProcessHandle>>,
75    /// Killer that works even after the waiter took `handle`.
76    killer: Arc<dyn ProcessKiller>,
77    session_id: String,
78    /// Shared with the waiter thread.
79    cancel_state: Arc<CancelState>,
80    /// Sender used by the waiter thread to signal that the post-exit
81    /// feedback push is complete. `None` if the test-side hasn't asked
82    /// to be notified.
83    completion_tx: Option<std::sync::mpsc::SyncSender<()>>,
84    /// Optional one-shot result channel installed by `cancel_handle` when a
85    /// caller wants cancellation to wait until artifacts have been drained.
86    result_tx: Option<std::sync::mpsc::SyncSender<VmValue>>,
87}
88
89#[derive(Default)]
90struct HandleStore {
91    entries: BTreeMap<String, HandleEntry>,
92}
93
94static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
95    LazyLock::new(|| Mutex::new(HandleStore::default()));
96
97/// Metadata returned to the caller immediately when a long-running spawn
98/// succeeds. Serialised as a response dict by the calling builtin.
99pub struct LongRunningHandleInfo {
100    /// Command identifier shared with foreground command responses.
101    pub command_id: String,
102    /// Opaque handle identifier, e.g. `"hto-<pid-hex>-<n>"`.
103    pub handle_id: String,
104    /// RFC 3339 timestamp of the spawn.
105    pub started_at: String,
106    /// Raw child process id reported by the platform.
107    pub pid: u32,
108    /// Child process group id when the platform exposes it.
109    pub process_group_id: Option<u32>,
110    /// Human-readable display form of the argv (space-joined).
111    pub command_display: String,
112}
113
114pub(crate) struct LongRunningSpawnOptions {
115    pub(crate) env_mode: EnvMode,
116    pub(crate) capture: CaptureConfig,
117    pub(crate) session_id: String,
118    pub(crate) progress_interval: Option<Duration>,
119    pub(crate) progress_max_inline_bytes: usize,
120}
121
122struct WaiterContext {
123    command_id: String,
124    handle_id: String,
125    session_id: String,
126    started_at: String,
127    process_group_id: Option<u32>,
128    command_display: String,
129    progress_interval: Option<Duration>,
130    progress_max_inline_bytes: usize,
131}
132
133struct ProgressThreadContext {
134    command_id: String,
135    handle_id: String,
136    session_id: String,
137    started_at: String,
138    command_display: String,
139    process_group_id: Option<u32>,
140    output_path: PathBuf,
141    stdout_path: PathBuf,
142    stderr_path: PathBuf,
143    output_state: Arc<Mutex<OutputState>>,
144    cancel_state: Arc<CancelState>,
145    done: Arc<AtomicBool>,
146    started: std::time::Instant,
147    interval: Duration,
148    max_inline_bytes: usize,
149}
150
151impl LongRunningHandleInfo {
152    /// Convert into the standard handle response dict returned to the agent.
153    pub fn into_handle_response(self) -> VmValue {
154        proc::running_response(
155            self.command_id,
156            self.handle_id,
157            self.pid,
158            self.process_group_id,
159            self.started_at,
160            self.command_display,
161        )
162    }
163}
164
165/// Spawn the argv as a long-running child process and return a handle.
166///
167/// The background waiter pushes a `tool_result` entry into the active
168/// session's `agent_inbox` when the process exits so the next
169/// agent-loop turn sees the result.
170pub fn spawn_long_running(
171    builtin: &'static str,
172    program: String,
173    args: Vec<String>,
174    cwd: Option<PathBuf>,
175    env: BTreeMap<String, String>,
176    session_id: String,
177) -> Result<LongRunningHandleInfo, HostlibError> {
178    spawn_long_running_with_options(
179        builtin,
180        program,
181        args,
182        cwd,
183        env,
184        LongRunningSpawnOptions {
185            env_mode: EnvMode::InheritClean,
186            capture: CaptureConfig::default(),
187            session_id,
188            progress_interval: None,
189            progress_max_inline_bytes: CaptureConfig::default().max_inline_bytes,
190        },
191    )
192}
193
194pub(crate) fn spawn_long_running_with_options(
195    builtin: &'static str,
196    program: String,
197    args: Vec<String>,
198    cwd: Option<PathBuf>,
199    env: BTreeMap<String, String>,
200    options: LongRunningSpawnOptions,
201) -> Result<LongRunningHandleInfo, HostlibError> {
202    let spec = SpawnSpec {
203        builtin,
204        program: program.clone(),
205        args: args.clone(),
206        cwd,
207        env,
208        env_mode: options.env_mode,
209        use_stdin: false,
210        configure_process_group: true,
211    };
212    let handle = process_handle::spawn_process(spec)
213        .map_err(|e| proc::process_error_to_hostlib(builtin, e))?;
214
215    let pid = handle.pid().unwrap_or(0);
216    let process_group_id = handle.process_group_id();
217    let killer = handle.killer();
218    let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
219    let handle_id = format!("hto-{:x}-{id}", std::process::id());
220    let command_id = proc::next_command_id();
221    let started_at = proc::now_rfc3339();
222
223    let mut all_argv = vec![program];
224    all_argv.extend(args.iter().cloned());
225    let command_display = all_argv.join(" ");
226
227    let cancel_state = Arc::new(CancelState {
228        cancelled: AtomicBool::new(false),
229        timed_out: AtomicBool::new(false),
230    });
231
232    {
233        let mut store = HANDLE_STORE
234            .lock()
235            .expect("long-running handle store poisoned");
236        store.entries.insert(
237            handle_id.clone(),
238            HandleEntry {
239                handle: Some(handle),
240                killer,
241                session_id: options.session_id.clone(),
242                cancel_state: cancel_state.clone(),
243                completion_tx: None,
244                result_tx: None,
245            },
246        );
247    }
248
249    let waiter_context = WaiterContext {
250        command_id: command_id.clone(),
251        handle_id: handle_id.clone(),
252        session_id: options.session_id,
253        started_at: started_at.clone(),
254        process_group_id,
255        command_display: command_display.clone(),
256        progress_interval: options.progress_interval,
257        progress_max_inline_bytes: options.progress_max_inline_bytes,
258    };
259    let waiter_thread_name = waiter_context.handle_id.clone();
260    let capture = options.capture;
261    std::thread::Builder::new()
262        .name(format!("hto-waiter-{waiter_thread_name}"))
263        .spawn(move || {
264            waiter_thread(waiter_context, cancel_state, capture);
265        })
266        .map_err(|e| HostlibError::Backend {
267            builtin,
268            message: format!("failed to spawn waiter thread: {e}"),
269        })?;
270
271    Ok(LongRunningHandleInfo {
272        command_id,
273        handle_id,
274        started_at,
275        pid,
276        process_group_id,
277        command_display,
278    })
279}
280
281/// Background thread that waits for a child process and fires feedback.
282fn waiter_thread(context: WaiterContext, cancel_state: Arc<CancelState>, capture: CaptureConfig) {
283    let waiter_start = std::time::Instant::now();
284
285    // Take the handle out of the store. If the entry is already gone (i.e.
286    // cancel_handle ran and removed it before us), exit without action.
287    let mut handle = {
288        let mut store = HANDLE_STORE
289            .lock()
290            .expect("long-running handle store poisoned");
291        match store.entries.get_mut(&context.handle_id) {
292            Some(entry) => match entry.handle.take() {
293                Some(h) => h,
294                None => return, // already cancelled before we ran
295            },
296            None => return, // entry removed (cancelled before store insert — shouldn't happen)
297        }
298    };
299
300    let output_state = Arc::new(Mutex::new(OutputState::default()));
301    let done = Arc::new(AtomicBool::new(false));
302    let planned = proc::planned_artifact_paths(&context.command_id);
303    if let Some(parent) = planned.output_path.parent() {
304        let _ = std::fs::create_dir_all(parent);
305    }
306    let _ = std::fs::File::create(&planned.stdout_path);
307    let _ = std::fs::File::create(&planned.stderr_path);
308    let combined_file = std::fs::File::create(&planned.output_path)
309        .ok()
310        .map(|file| Arc::new(Mutex::new(file)));
311
312    let stdout_thread = handle.take_stdout().map(|out| {
313        spawn_output_drain(
314            out,
315            output_state.clone(),
316            planned.stdout_path.clone(),
317            combined_file.clone(),
318            true,
319        )
320    });
321    let stderr_thread = handle.take_stderr().map(|err| {
322        spawn_output_drain(
323            err,
324            output_state.clone(),
325            planned.stderr_path.clone(),
326            combined_file.clone(),
327            false,
328        )
329    });
330
331    let progress_thread = context
332        .progress_interval
333        .filter(|interval| !interval.is_zero())
334        .map(|interval| {
335            spawn_progress_thread(ProgressThreadContext {
336                command_id: context.command_id.clone(),
337                handle_id: context.handle_id.clone(),
338                session_id: context.session_id.clone(),
339                started_at: context.started_at.clone(),
340                command_display: context.command_display.clone(),
341                process_group_id: context.process_group_id,
342                output_path: planned.output_path.clone(),
343                stdout_path: planned.stdout_path.clone(),
344                stderr_path: planned.stderr_path.clone(),
345                output_state: output_state.clone(),
346                cancel_state: cancel_state.clone(),
347                done: done.clone(),
348                started: waiter_start,
349                interval,
350                max_inline_bytes: context.progress_max_inline_bytes,
351            })
352        });
353
354    let status = handle.wait().ok();
355
356    if let Some(thread) = stdout_thread {
357        let _ = thread.join();
358    }
359    if let Some(thread) = stderr_thread {
360        let _ = thread.join();
361    }
362    done.store(true, Ordering::Release);
363    drop(progress_thread);
364    let (stdout, stderr) = {
365        let state = output_state
366            .lock()
367            .unwrap_or_else(|poison| poison.into_inner());
368        (state.stdout.clone(), state.stderr.clone())
369    };
370
371    // Remove our entry from the store, taking notifiers on the way out so we
372    // can signal them after the feedback/result path completes.
373    let (completion_tx, result_tx) = {
374        let mut store = HANDLE_STORE
375            .lock()
376            .expect("long-running handle store poisoned");
377        let entry = store
378            .entries
379            .remove(&context.handle_id)
380            .map(|mut e| (e.completion_tx.take(), e.result_tx.take()));
381        entry.unwrap_or((None, None))
382    };
383
384    let signal_done = move || {
385        if let Some(tx) = completion_tx {
386            let _ = tx.try_send(());
387        }
388    };
389
390    let cancelled = cancel_state.cancelled.load(Ordering::Acquire);
391    let timed_out = cancelled && cancel_state.timed_out.load(Ordering::Acquire);
392
393    let (exit_code, signal_name) = match status {
394        Some(s) => decode_exit_status(s),
395        // wait() itself failed — treat as killed (extremely unusual).
396        None => (-1, Some("SIGKILL".to_string())),
397    };
398    let command_status = if timed_out {
399        CommandStatus::TimedOut
400    } else if cancelled {
401        CommandStatus::Killed
402    } else {
403        CommandStatus::Completed
404    };
405    let duration = waiter_start.elapsed();
406    let duration_ms = duration.as_millis() as i64;
407    let artifacts = match proc::persist_artifacts(
408        &context.command_id,
409        &stdout,
410        &stderr,
411        Some(&context.handle_id),
412    ) {
413        Ok(artifacts) => artifacts,
414        Err(_) => return,
415    };
416    let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
417
418    let mut payload = serde_json::Map::new();
419    payload.insert(
420        "command_id".into(),
421        serde_json::Value::String(context.command_id.clone()),
422    );
423    payload.insert(
424        "status".into(),
425        serde_json::Value::String(command_status.as_str().to_string()),
426    );
427    payload.insert(
428        "handle_id".into(),
429        serde_json::Value::String(context.handle_id),
430    );
431    payload.insert(
432        "command_or_op_descriptor".into(),
433        serde_json::Value::String(context.command_display),
434    );
435    payload.insert(
436        "started_at".into(),
437        serde_json::Value::String(context.started_at),
438    );
439    payload.insert(
440        "ended_at".into(),
441        serde_json::Value::String(proc::now_rfc3339()),
442    );
443    payload.insert(
444        "duration_ms".into(),
445        serde_json::Value::Number(duration_ms.into()),
446    );
447    payload.insert(
448        "exit_code".into(),
449        serde_json::Value::Number(exit_code.into()),
450    );
451    payload.insert("timed_out".into(), serde_json::Value::Bool(timed_out));
452    payload.insert("stdout".into(), serde_json::Value::String(inline_stdout));
453    payload.insert("stderr".into(), serde_json::Value::String(inline_stderr));
454    payload.insert(
455        "output_path".into(),
456        serde_json::Value::String(artifacts.output_path.display().to_string()),
457    );
458    payload.insert(
459        "stdout_path".into(),
460        serde_json::Value::String(artifacts.stdout_path.display().to_string()),
461    );
462    payload.insert(
463        "stderr_path".into(),
464        serde_json::Value::String(artifacts.stderr_path.display().to_string()),
465    );
466    payload.insert(
467        "line_count".into(),
468        serde_json::Value::Number(artifacts.line_count.into()),
469    );
470    payload.insert(
471        "byte_count".into(),
472        serde_json::Value::Number(artifacts.byte_count.into()),
473    );
474    payload.insert(
475        "output_sha256".into(),
476        serde_json::Value::String(artifacts.output_sha256),
477    );
478    if let Some(pgid) = context.process_group_id {
479        payload.insert(
480            "process_group_id".into(),
481            serde_json::Value::Number((pgid as u64).into()),
482        );
483    }
484    if let Some(sig) = signal_name {
485        payload.insert("signal".into(), serde_json::Value::String(sig));
486    } else {
487        payload.insert("signal".into(), serde_json::Value::Null);
488    }
489
490    if let Some(tx) = result_tx {
491        let value = serde_json::Value::Object(payload.clone());
492        let _ = tx.try_send(harn_vm::json_to_vm_value(&value));
493    }
494    if !cancelled {
495        let content = serde_json::to_string(&payload).unwrap_or_default();
496        harn_vm::orchestration::agent_inbox::push(
497            &context.session_id,
498            "tool_result",
499            &content,
500            "hostlib.long_running.exit",
501        );
502    }
503    signal_done();
504}
505
506fn spawn_output_drain(
507    mut reader: Box<dyn Read + Send>,
508    state: Arc<Mutex<OutputState>>,
509    path: std::path::PathBuf,
510    combined_file: Option<Arc<Mutex<std::fs::File>>>,
511    stdout: bool,
512) -> std::thread::JoinHandle<()> {
513    std::thread::spawn(move || {
514        let mut file = std::fs::File::create(path).ok();
515        let mut buf = [0_u8; 8192];
516        loop {
517            let read = match reader.read(&mut buf) {
518                Ok(0) => break,
519                Ok(read) => read,
520                Err(_) => break,
521            };
522            let chunk = &buf[..read];
523            if let Some(file) = file.as_mut() {
524                let _ = file.write_all(chunk);
525            }
526            if let Some(combined) = combined_file.as_ref() {
527                if let Ok(mut combined) = combined.lock() {
528                    let _ = combined.write_all(chunk);
529                }
530            }
531            if let Ok(mut state) = state.lock() {
532                if stdout {
533                    state.stdout.extend_from_slice(chunk);
534                } else {
535                    state.stderr.extend_from_slice(chunk);
536                }
537            }
538        }
539    })
540}
541
542fn spawn_progress_thread(context: ProgressThreadContext) -> std::thread::JoinHandle<()> {
543    std::thread::spawn(move || {
544        while !context.done.load(Ordering::Acquire)
545            && !context.cancel_state.cancelled.load(Ordering::Acquire)
546        {
547            std::thread::sleep(context.interval);
548            if context.done.load(Ordering::Acquire)
549                || context.cancel_state.cancelled.load(Ordering::Acquire)
550            {
551                break;
552            }
553            let (stdout, stderr) = {
554                let state = context
555                    .output_state
556                    .lock()
557                    .unwrap_or_else(|poison| poison.into_inner());
558                (state.stdout.clone(), state.stderr.clone())
559            };
560            let capture = CaptureConfig {
561                max_inline_bytes: context.max_inline_bytes,
562                ..CaptureConfig::default()
563            };
564            let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
565            let byte_count = stdout.len().saturating_add(stderr.len());
566            let payload = serde_json::json!({
567                "command_id": &context.command_id,
568                "handle_id": &context.handle_id,
569                "status": CommandStatus::Running.as_str(),
570                "command_or_op_descriptor": &context.command_display,
571                "started_at": &context.started_at,
572                "ended_at": null,
573                "duration_ms": context.started.elapsed().as_millis() as i64,
574                "exit_code": null,
575                "signal": null,
576                "stdout": inline_stdout,
577                "stderr": inline_stderr,
578                "output_path": context.output_path.display().to_string(),
579                "stdout_path": context.stdout_path.display().to_string(),
580                "stderr_path": context.stderr_path.display().to_string(),
581                "byte_count": byte_count as i64,
582                "line_count": stdout.iter().chain(stderr.iter()).filter(|byte| **byte == b'\n').count() as i64,
583                "process_group_id": context.process_group_id,
584            });
585            harn_vm::orchestration::agent_inbox::push(
586                &context.session_id,
587                "tool_progress",
588                &payload.to_string(),
589                "hostlib.long_running.progress",
590            );
591        }
592    })
593}
594
595pub(crate) struct CancelOptions {
596    pub(crate) timed_out: bool,
597    pub(crate) wait_result: Option<Duration>,
598}
599
600pub(crate) struct CancelOutcome {
601    pub(crate) cancelled: bool,
602    pub(crate) result: Option<VmValue>,
603}
604
605/// Cancel a specific in-flight long-running handle. Kills the process and lets
606/// the waiter drain output/artifacts. Returns `true` if the handle was found
607/// and cancellation was newly requested.
608pub fn cancel_handle(handle_id: &str) -> bool {
609    cancel_handle_with_options(
610        handle_id,
611        CancelOptions {
612            timed_out: false,
613            wait_result: None,
614        },
615    )
616    .cancelled
617}
618
619pub(crate) fn cancel_handle_with_options(handle_id: &str, options: CancelOptions) -> CancelOutcome {
620    let (killer, cancel_state, result_rx) = {
621        let mut store = HANDLE_STORE
622            .lock()
623            .expect("long-running handle store poisoned");
624        let Some(entry) = store.entries.get_mut(handle_id) else {
625            return CancelOutcome {
626                cancelled: false,
627                result: None,
628            };
629        };
630        if entry.cancel_state.cancelled.swap(true, Ordering::AcqRel) {
631            return CancelOutcome {
632                cancelled: false,
633                result: None,
634            };
635        }
636        entry
637            .cancel_state
638            .timed_out
639            .store(options.timed_out, Ordering::Release);
640        let result_rx = options.wait_result.map(|_| {
641            let (tx, rx) = std::sync::mpsc::sync_channel::<VmValue>(1);
642            entry.result_tx = Some(tx);
643            rx
644        });
645        (entry.killer.clone(), entry.cancel_state.clone(), result_rx)
646    };
647    do_kill(killer, cancel_state);
648    let result = match (options.wait_result, result_rx) {
649        (Some(timeout), Some(rx)) => rx.recv_timeout(timeout).ok(),
650        _ => None,
651    };
652    CancelOutcome {
653        cancelled: true,
654        result,
655    }
656}
657
658/// Tuple shape used by `cancel_session_handles` to drain entries while
659/// holding the store lock for as little as possible. Boxed-trait fields
660/// make it noisy to inline as an unnamed type.
661type SessionKillEntry = (Arc<dyn ProcessKiller>, Arc<CancelState>);
662
663/// Cancel all in-flight handles for a given session. Called by the
664/// session-end hook to avoid orphaned processes.
665pub fn cancel_session_handles(session_id: &str) {
666    let to_kill: Vec<SessionKillEntry> = {
667        let store = HANDLE_STORE
668            .lock()
669            .expect("long-running handle store poisoned");
670        let matching: Vec<String> = store
671            .entries
672            .iter()
673            .filter(|(_, e)| e.session_id == session_id)
674            .map(|(id, _)| id.clone())
675            .collect();
676        matching
677            .into_iter()
678            .filter_map(|id| {
679                let entry = store.entries.get(&id)?;
680                if entry.cancel_state.cancelled.swap(true, Ordering::AcqRel) {
681                    return None;
682                }
683                entry.cancel_state.timed_out.store(false, Ordering::Release);
684                Some((entry.killer.clone(), entry.cancel_state.clone()))
685            })
686            .collect()
687    };
688    for (killer, cancel_state) in to_kill {
689        do_kill(killer, cancel_state);
690    }
691}
692
693/// Set the cancellation flag and kill the process. Used by both `cancel_handle`
694/// and `cancel_session_handles`.
695fn do_kill(killer: Arc<dyn ProcessKiller>, cancel_state: Arc<CancelState>) {
696    // Kill via the handle's killer (works whether or not we still own
697    // the handle). The waiter owns process reaping and artifact finalization.
698    killer.kill();
699    cancel_state.cancelled.store(true, Ordering::Release);
700}
701
702/// Register the session-cleanup hook with harn-vm. Uses a `OnceLock` so the
703/// hook is registered exactly once even if `register_builtins` is called
704/// multiple times (e.g. in tests).
705pub(crate) fn register_cleanup_hook() {
706    static REGISTERED: OnceLock<()> = OnceLock::new();
707    REGISTERED.get_or_init(|| {
708        let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
709            cancel_session_handles(session_id);
710        });
711        harn_vm::register_session_end_hook(hook);
712    });
713}
714
715fn decode_exit_status(status: process_handle::ExitStatus) -> (i32, Option<String>) {
716    if let Some(code) = status.code {
717        return (code, None);
718    }
719    if let Some(sig) = status.signal {
720        return (-1, Some(format!("SIG{sig}")));
721    }
722    (-1, None)
723}
724
725/// Register a completion notifier for `handle_id`. The waiter thread sends
726/// `()` on the returned receiver after it pushes the feedback item to the
727/// global queue. Returns `None` if the handle is no longer in the store
728/// (e.g. already cancelled or completed). Used by tests to await waiter
729/// completion deterministically — no polling, no `thread::sleep`.
730pub fn register_completion_notifier(handle_id: &str) -> Option<std::sync::mpsc::Receiver<()>> {
731    let (tx, rx) = std::sync::mpsc::sync_channel::<()>(1);
732    let mut store = HANDLE_STORE
733        .lock()
734        .expect("long-running handle store poisoned");
735    let entry = store.entries.get_mut(handle_id)?;
736    entry.completion_tx = Some(tx);
737    Some(rx)
738}