Skip to main content

harn_hostlib/tools/
long_running.rs

1//! Long-running tool handle machinery.
2//!
3//! When a caller passes `long_running: true` to `run_command`, `run_test`, or
4//! `run_build_command`, the builtin spawns the child process without waiting,
5//! registers it here, and returns a handle dict immediately:
6//!
7//! ```json
8//! {
9//!   "handle_id": "hto-<pid-hex>-<n>",
10//!   "started_at": "...",
11//!   "command_or_op_descriptor": "..."
12//! }
13//! ```
14//!
15//! A background thread waits for the child and, when it exits, pushes a
16//! `tool_result` entry into the active session's `agent_inbox` via
17//! `harn_vm::orchestration::agent_inbox::push(...)` so the agent-loop's
18//! next turn-preflight (or post-compaction drain) picks it up.
19//!
20//! ### Cancellation
21//!
22//! `cancel_handle(handle_id)` kills the spawned process (SIGKILL) within
23//! 2 seconds. The session-end hook registered on startup kills every
24//! in-flight handle associated with the ending session.
25//!
26//! #### PID-based signaling
27//!
28//! The waiter thread takes ownership of the `Child` object to drain
29//! stdout/stderr and call `wait()`. To keep cancellation possible even
30//! after the waiter has taken the `Child`, we store the raw OS process ID
31//! in the entry and kill by PID when needed. On Unix we call `kill(2)`
32//! directly via an `extern "C"` declaration (no `libc` crate required).
33//! A shared `cancelled` flag suppresses the feedback push when the waiter
34//! sees an exit caused by cancellation. Callers that need artifact-stable
35//! cancellation can opt into waiting for the waiter result through
36//! `cancel_handle`.
37
38use std::collections::BTreeMap;
39use std::io::{Read, Write};
40use std::path::PathBuf;
41use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
42use std::sync::{Arc, LazyLock, Mutex, OnceLock};
43use std::time::Duration;
44
45use harn_vm::VmValue;
46
47use crate::error::HostlibError;
48use crate::process::{self as process_handle, ProcessHandle, ProcessKiller, SpawnSpec};
49use crate::tools::proc::{self, CaptureConfig, CommandStatus, EnvMode};
50
51/// Atomic counter for generating unique handle IDs within this process.
52static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
53
54/// Shared cancellation state between the store entry and its waiter thread.
55struct CancelState {
56    /// Set to `true` when `cancel_handle` / `cancel_session_handles` runs.
57    /// The waiter checks this before pushing feedback.
58    cancelled: AtomicBool,
59    /// Set by cancellation paths that represent a timeout rather than a
60    /// user-requested kill. The waiter uses this for the returned result
61    /// status while still suppressing inbox feedback.
62    timed_out: AtomicBool,
63}
64
65#[derive(Default)]
66struct OutputState {
67    stdout: Vec<u8>,
68    stderr: Vec<u8>,
69}
70
71/// Shared state for a single in-flight child process.
72struct HandleEntry {
73    /// The process handle. `None` after the waiter thread takes ownership.
74    handle: Option<Box<dyn ProcessHandle>>,
75    /// Killer that works even after the waiter took `handle`.
76    killer: Arc<dyn ProcessKiller>,
77    session_id: String,
78    /// Shared with the waiter thread.
79    cancel_state: Arc<CancelState>,
80    /// Sender used by the waiter thread to signal that the post-exit
81    /// feedback push is complete. `None` if the test-side hasn't asked
82    /// to be notified.
83    completion_tx: Option<std::sync::mpsc::SyncSender<()>>,
84    /// Optional one-shot result channel installed by `cancel_handle` when a
85    /// caller wants cancellation to wait until artifacts have been drained.
86    result_tx: Option<std::sync::mpsc::SyncSender<VmValue>>,
87}
88
89#[derive(Default)]
90struct HandleStore {
91    entries: BTreeMap<String, HandleEntry>,
92}
93
94static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
95    LazyLock::new(|| Mutex::new(HandleStore::default()));
96
97/// Metadata returned to the caller immediately when a long-running spawn
98/// succeeds. Serialised as a response dict by the calling builtin.
99pub struct LongRunningHandleInfo {
100    /// Command identifier shared with foreground command responses.
101    pub command_id: String,
102    /// Opaque handle identifier, e.g. `"hto-<pid-hex>-<n>"`.
103    pub handle_id: String,
104    /// RFC 3339 timestamp of the spawn.
105    pub started_at: String,
106    /// Raw child process id reported by the platform.
107    pub pid: u32,
108    /// Child process group id when the platform exposes it.
109    pub process_group_id: Option<u32>,
110    /// Human-readable display form of the argv (space-joined).
111    pub command_display: String,
112}
113
114pub(crate) struct LongRunningSpawnOptions {
115    pub(crate) env_mode: EnvMode,
116    pub(crate) capture: CaptureConfig,
117    pub(crate) session_id: String,
118    pub(crate) progress_interval: Option<Duration>,
119    pub(crate) progress_max_inline_bytes: usize,
120}
121
122struct WaiterContext {
123    command_id: String,
124    handle_id: String,
125    session_id: String,
126    started_at: String,
127    process_group_id: Option<u32>,
128    command_display: String,
129    progress_interval: Option<Duration>,
130    progress_max_inline_bytes: usize,
131}
132
133struct ProgressThreadContext {
134    command_id: String,
135    handle_id: String,
136    session_id: String,
137    started_at: String,
138    command_display: String,
139    process_group_id: Option<u32>,
140    output_path: PathBuf,
141    stdout_path: PathBuf,
142    stderr_path: PathBuf,
143    output_state: Arc<Mutex<OutputState>>,
144    cancel_state: Arc<CancelState>,
145    done: Arc<AtomicBool>,
146    started: std::time::Instant,
147    interval: Duration,
148    max_inline_bytes: usize,
149}
150
151impl LongRunningHandleInfo {
152    /// Convert into the standard handle response dict returned to the agent.
153    pub fn into_handle_response(self) -> VmValue {
154        proc::running_response(
155            self.command_id,
156            self.handle_id,
157            self.pid,
158            self.process_group_id,
159            self.started_at,
160            self.command_display,
161        )
162    }
163}
164
165/// Spawn the argv as a long-running child process and return a handle.
166///
167/// The background waiter pushes a `tool_result` entry into the active
168/// session's `agent_inbox` when the process exits so the next
169/// agent-loop turn sees the result.
170pub fn spawn_long_running(
171    builtin: &'static str,
172    program: String,
173    args: Vec<String>,
174    cwd: Option<PathBuf>,
175    env: BTreeMap<String, String>,
176    session_id: String,
177) -> Result<LongRunningHandleInfo, HostlibError> {
178    spawn_long_running_with_options(
179        builtin,
180        program,
181        args,
182        cwd,
183        env,
184        LongRunningSpawnOptions {
185            env_mode: EnvMode::InheritClean,
186            capture: CaptureConfig::default(),
187            session_id,
188            progress_interval: None,
189            progress_max_inline_bytes: CaptureConfig::default().max_inline_bytes,
190        },
191    )
192}
193
194pub(crate) fn spawn_long_running_with_options(
195    builtin: &'static str,
196    program: String,
197    args: Vec<String>,
198    cwd: Option<PathBuf>,
199    env: BTreeMap<String, String>,
200    options: LongRunningSpawnOptions,
201) -> Result<LongRunningHandleInfo, HostlibError> {
202    let mut env = env;
203    proc::apply_toolchain_path(cwd.as_deref(), &mut env, options.env_mode);
204    let spec = SpawnSpec {
205        builtin,
206        program: program.clone(),
207        args: args.clone(),
208        cwd,
209        env,
210        env_mode: options.env_mode,
211        use_stdin: false,
212        configure_process_group: true,
213    };
214    let handle = process_handle::spawn_process(spec)
215        .map_err(|e| proc::process_error_to_hostlib(builtin, e))?;
216
217    let pid = handle.pid().unwrap_or(0);
218    let process_group_id = handle.process_group_id();
219    let killer = handle.killer();
220    let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
221    let handle_id = format!("hto-{:x}-{id}", std::process::id());
222    let command_id = proc::next_command_id();
223    let started_at = proc::now_rfc3339();
224
225    let mut all_argv = vec![program];
226    all_argv.extend(args.iter().cloned());
227    let command_display = all_argv.join(" ");
228
229    let cancel_state = Arc::new(CancelState {
230        cancelled: AtomicBool::new(false),
231        timed_out: AtomicBool::new(false),
232    });
233
234    {
235        let mut store = HANDLE_STORE
236            .lock()
237            .expect("long-running handle store poisoned");
238        store.entries.insert(
239            handle_id.clone(),
240            HandleEntry {
241                handle: Some(handle),
242                killer,
243                session_id: options.session_id.clone(),
244                cancel_state: cancel_state.clone(),
245                completion_tx: None,
246                result_tx: None,
247            },
248        );
249    }
250
251    let waiter_context = WaiterContext {
252        command_id: command_id.clone(),
253        handle_id: handle_id.clone(),
254        session_id: options.session_id,
255        started_at: started_at.clone(),
256        process_group_id,
257        command_display: command_display.clone(),
258        progress_interval: options.progress_interval,
259        progress_max_inline_bytes: options.progress_max_inline_bytes,
260    };
261    let waiter_thread_name = waiter_context.handle_id.clone();
262    let capture = options.capture;
263    std::thread::Builder::new()
264        .name(format!("hto-waiter-{waiter_thread_name}"))
265        .spawn(move || {
266            waiter_thread(waiter_context, cancel_state, capture);
267        })
268        .map_err(|e| HostlibError::Backend {
269            builtin,
270            message: format!("failed to spawn waiter thread: {e}"),
271        })?;
272
273    Ok(LongRunningHandleInfo {
274        command_id,
275        handle_id,
276        started_at,
277        pid,
278        process_group_id,
279        command_display,
280    })
281}
282
283/// Background thread that waits for a child process and fires feedback.
284fn waiter_thread(context: WaiterContext, cancel_state: Arc<CancelState>, capture: CaptureConfig) {
285    let waiter_start = std::time::Instant::now();
286
287    // Take the handle out of the store. If the entry is already gone (i.e.
288    // cancel_handle ran and removed it before us), exit without action.
289    let mut handle = {
290        let mut store = HANDLE_STORE
291            .lock()
292            .expect("long-running handle store poisoned");
293        match store.entries.get_mut(&context.handle_id) {
294            Some(entry) => match entry.handle.take() {
295                Some(h) => h,
296                None => return, // already cancelled before we ran
297            },
298            None => return, // entry removed (cancelled before store insert — shouldn't happen)
299        }
300    };
301
302    let output_state = Arc::new(Mutex::new(OutputState::default()));
303    let done = Arc::new(AtomicBool::new(false));
304    let planned = proc::planned_artifact_paths(&context.command_id);
305    if let Some(parent) = planned.output_path.parent() {
306        let _ = std::fs::create_dir_all(parent);
307    }
308    let _ = std::fs::File::create(&planned.stdout_path);
309    let _ = std::fs::File::create(&planned.stderr_path);
310    let combined_file = std::fs::File::create(&planned.output_path)
311        .ok()
312        .map(|file| Arc::new(Mutex::new(file)));
313
314    let stdout_thread = handle.take_stdout().map(|out| {
315        spawn_output_drain(
316            out,
317            output_state.clone(),
318            planned.stdout_path.clone(),
319            combined_file.clone(),
320            true,
321        )
322    });
323    let stderr_thread = handle.take_stderr().map(|err| {
324        spawn_output_drain(
325            err,
326            output_state.clone(),
327            planned.stderr_path.clone(),
328            combined_file.clone(),
329            false,
330        )
331    });
332
333    let progress_thread = context
334        .progress_interval
335        .filter(|interval| !interval.is_zero())
336        .map(|interval| {
337            spawn_progress_thread(ProgressThreadContext {
338                command_id: context.command_id.clone(),
339                handle_id: context.handle_id.clone(),
340                session_id: context.session_id.clone(),
341                started_at: context.started_at.clone(),
342                command_display: context.command_display.clone(),
343                process_group_id: context.process_group_id,
344                output_path: planned.output_path.clone(),
345                stdout_path: planned.stdout_path.clone(),
346                stderr_path: planned.stderr_path.clone(),
347                output_state: output_state.clone(),
348                cancel_state: cancel_state.clone(),
349                done: done.clone(),
350                started: waiter_start,
351                interval,
352                max_inline_bytes: context.progress_max_inline_bytes,
353            })
354        });
355
356    let status = handle.wait().ok();
357
358    if let Some(thread) = stdout_thread {
359        let _ = thread.join();
360    }
361    if let Some(thread) = stderr_thread {
362        let _ = thread.join();
363    }
364    done.store(true, Ordering::Release);
365    drop(progress_thread);
366    let (stdout, stderr) = {
367        let state = output_state
368            .lock()
369            .unwrap_or_else(|poison| poison.into_inner());
370        (state.stdout.clone(), state.stderr.clone())
371    };
372
373    // Remove our entry from the store, taking notifiers on the way out so we
374    // can signal them after the feedback/result path completes.
375    let (completion_tx, result_tx) = {
376        let mut store = HANDLE_STORE
377            .lock()
378            .expect("long-running handle store poisoned");
379        let entry = store
380            .entries
381            .remove(&context.handle_id)
382            .map(|mut e| (e.completion_tx.take(), e.result_tx.take()));
383        entry.unwrap_or((None, None))
384    };
385
386    let signal_done = move || {
387        if let Some(tx) = completion_tx {
388            let _ = tx.try_send(());
389        }
390    };
391
392    let cancelled = cancel_state.cancelled.load(Ordering::Acquire);
393    let timed_out = cancelled && cancel_state.timed_out.load(Ordering::Acquire);
394
395    let (exit_code, signal_name) = match status {
396        Some(s) => decode_exit_status(s),
397        // wait() itself failed — treat as killed (extremely unusual).
398        None => (-1, Some("SIGKILL".to_string())),
399    };
400    let command_status = if timed_out {
401        CommandStatus::TimedOut
402    } else if cancelled {
403        CommandStatus::Killed
404    } else {
405        CommandStatus::Completed
406    };
407    let duration = waiter_start.elapsed();
408    let duration_ms = duration.as_millis() as i64;
409    let artifacts = match proc::persist_artifacts(
410        &context.command_id,
411        &stdout,
412        &stderr,
413        Some(&context.handle_id),
414    ) {
415        Ok(artifacts) => artifacts,
416        Err(_) => return,
417    };
418    let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
419
420    let mut payload = serde_json::Map::new();
421    payload.insert(
422        "command_id".into(),
423        serde_json::Value::String(context.command_id.clone()),
424    );
425    payload.insert(
426        "status".into(),
427        serde_json::Value::String(command_status.as_str().to_string()),
428    );
429    payload.insert(
430        "handle_id".into(),
431        serde_json::Value::String(context.handle_id),
432    );
433    payload.insert(
434        "command_or_op_descriptor".into(),
435        serde_json::Value::String(context.command_display),
436    );
437    payload.insert(
438        "started_at".into(),
439        serde_json::Value::String(context.started_at),
440    );
441    payload.insert(
442        "ended_at".into(),
443        serde_json::Value::String(proc::now_rfc3339()),
444    );
445    payload.insert(
446        "duration_ms".into(),
447        serde_json::Value::Number(duration_ms.into()),
448    );
449    payload.insert(
450        "exit_code".into(),
451        serde_json::Value::Number(exit_code.into()),
452    );
453    payload.insert("timed_out".into(), serde_json::Value::Bool(timed_out));
454    payload.insert("stdout".into(), serde_json::Value::String(inline_stdout));
455    payload.insert("stderr".into(), serde_json::Value::String(inline_stderr));
456    payload.insert(
457        "output_path".into(),
458        serde_json::Value::String(artifacts.output_path.display().to_string()),
459    );
460    payload.insert(
461        "stdout_path".into(),
462        serde_json::Value::String(artifacts.stdout_path.display().to_string()),
463    );
464    payload.insert(
465        "stderr_path".into(),
466        serde_json::Value::String(artifacts.stderr_path.display().to_string()),
467    );
468    payload.insert(
469        "line_count".into(),
470        serde_json::Value::Number(artifacts.line_count.into()),
471    );
472    payload.insert(
473        "byte_count".into(),
474        serde_json::Value::Number(artifacts.byte_count.into()),
475    );
476    payload.insert(
477        "output_sha256".into(),
478        serde_json::Value::String(artifacts.output_sha256),
479    );
480    if let Some(pgid) = context.process_group_id {
481        payload.insert(
482            "process_group_id".into(),
483            serde_json::Value::Number((pgid as u64).into()),
484        );
485    }
486    if let Some(sig) = signal_name {
487        payload.insert("signal".into(), serde_json::Value::String(sig));
488    } else {
489        payload.insert("signal".into(), serde_json::Value::Null);
490    }
491
492    if let Some(tx) = result_tx {
493        let value = serde_json::Value::Object(payload.clone());
494        let _ = tx.try_send(harn_vm::json_to_vm_value(&value));
495    }
496    if !cancelled {
497        let content = serde_json::to_string(&payload).unwrap_or_default();
498        harn_vm::orchestration::agent_inbox::push(
499            &context.session_id,
500            "tool_result",
501            &content,
502            "hostlib.long_running.exit",
503        );
504    }
505    signal_done();
506}
507
508fn spawn_output_drain(
509    mut reader: Box<dyn Read + Send>,
510    state: Arc<Mutex<OutputState>>,
511    path: std::path::PathBuf,
512    combined_file: Option<Arc<Mutex<std::fs::File>>>,
513    stdout: bool,
514) -> std::thread::JoinHandle<()> {
515    std::thread::spawn(move || {
516        let mut file = std::fs::File::create(path).ok();
517        let mut buf = [0_u8; 8192];
518        loop {
519            let read = match reader.read(&mut buf) {
520                Ok(0) => break,
521                Ok(read) => read,
522                Err(_) => break,
523            };
524            let chunk = &buf[..read];
525            if let Some(file) = file.as_mut() {
526                let _ = file.write_all(chunk);
527            }
528            if let Some(combined) = combined_file.as_ref() {
529                if let Ok(mut combined) = combined.lock() {
530                    let _ = combined.write_all(chunk);
531                }
532            }
533            if let Ok(mut state) = state.lock() {
534                if stdout {
535                    state.stdout.extend_from_slice(chunk);
536                } else {
537                    state.stderr.extend_from_slice(chunk);
538                }
539            }
540        }
541    })
542}
543
544fn spawn_progress_thread(context: ProgressThreadContext) -> std::thread::JoinHandle<()> {
545    std::thread::spawn(move || {
546        while !context.done.load(Ordering::Acquire)
547            && !context.cancel_state.cancelled.load(Ordering::Acquire)
548        {
549            std::thread::sleep(context.interval);
550            if context.done.load(Ordering::Acquire)
551                || context.cancel_state.cancelled.load(Ordering::Acquire)
552            {
553                break;
554            }
555            let (stdout, stderr) = {
556                let state = context
557                    .output_state
558                    .lock()
559                    .unwrap_or_else(|poison| poison.into_inner());
560                (state.stdout.clone(), state.stderr.clone())
561            };
562            let capture = CaptureConfig {
563                max_inline_bytes: context.max_inline_bytes,
564                ..CaptureConfig::default()
565            };
566            let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
567            let byte_count = stdout.len().saturating_add(stderr.len());
568            let payload = serde_json::json!({
569                "command_id": &context.command_id,
570                "handle_id": &context.handle_id,
571                "status": CommandStatus::Running.as_str(),
572                "command_or_op_descriptor": &context.command_display,
573                "started_at": &context.started_at,
574                "ended_at": null,
575                "duration_ms": context.started.elapsed().as_millis() as i64,
576                "exit_code": null,
577                "signal": null,
578                "stdout": inline_stdout,
579                "stderr": inline_stderr,
580                "output_path": context.output_path.display().to_string(),
581                "stdout_path": context.stdout_path.display().to_string(),
582                "stderr_path": context.stderr_path.display().to_string(),
583                "byte_count": byte_count as i64,
584                "line_count": stdout.iter().chain(stderr.iter()).filter(|byte| **byte == b'\n').count() as i64,
585                "process_group_id": context.process_group_id,
586            });
587            harn_vm::orchestration::agent_inbox::push(
588                &context.session_id,
589                "tool_progress",
590                &payload.to_string(),
591                "hostlib.long_running.progress",
592            );
593        }
594    })
595}
596
597pub(crate) struct CancelOptions {
598    pub(crate) timed_out: bool,
599    pub(crate) wait_result: Option<Duration>,
600}
601
602pub(crate) struct CancelOutcome {
603    pub(crate) cancelled: bool,
604    pub(crate) result: Option<VmValue>,
605}
606
607/// Cancel a specific in-flight long-running handle. Kills the process and lets
608/// the waiter drain output/artifacts. Returns `true` if the handle was found
609/// and cancellation was newly requested.
610pub fn cancel_handle(handle_id: &str) -> bool {
611    cancel_handle_with_options(
612        handle_id,
613        CancelOptions {
614            timed_out: false,
615            wait_result: None,
616        },
617    )
618    .cancelled
619}
620
621pub(crate) fn cancel_handle_with_options(handle_id: &str, options: CancelOptions) -> CancelOutcome {
622    let (killer, cancel_state, result_rx) = {
623        let mut store = HANDLE_STORE
624            .lock()
625            .expect("long-running handle store poisoned");
626        let Some(entry) = store.entries.get_mut(handle_id) else {
627            return CancelOutcome {
628                cancelled: false,
629                result: None,
630            };
631        };
632        if entry.cancel_state.cancelled.swap(true, Ordering::AcqRel) {
633            return CancelOutcome {
634                cancelled: false,
635                result: None,
636            };
637        }
638        entry
639            .cancel_state
640            .timed_out
641            .store(options.timed_out, Ordering::Release);
642        let result_rx = options.wait_result.map(|_| {
643            let (tx, rx) = std::sync::mpsc::sync_channel::<VmValue>(1);
644            entry.result_tx = Some(tx);
645            rx
646        });
647        (entry.killer.clone(), entry.cancel_state.clone(), result_rx)
648    };
649    do_kill(killer, cancel_state);
650    let result = match (options.wait_result, result_rx) {
651        (Some(timeout), Some(rx)) => rx.recv_timeout(timeout).ok(),
652        _ => None,
653    };
654    CancelOutcome {
655        cancelled: true,
656        result,
657    }
658}
659
660/// Tuple shape used by `cancel_session_handles` to drain entries while
661/// holding the store lock for as little as possible. Boxed-trait fields
662/// make it noisy to inline as an unnamed type.
663type SessionKillEntry = (Arc<dyn ProcessKiller>, Arc<CancelState>);
664
665/// Cancel all in-flight handles for a given session. Called by the
666/// session-end hook to avoid orphaned processes.
667pub fn cancel_session_handles(session_id: &str) {
668    let to_kill: Vec<SessionKillEntry> = {
669        let store = HANDLE_STORE
670            .lock()
671            .expect("long-running handle store poisoned");
672        let matching: Vec<String> = store
673            .entries
674            .iter()
675            .filter(|(_, e)| e.session_id == session_id)
676            .map(|(id, _)| id.clone())
677            .collect();
678        matching
679            .into_iter()
680            .filter_map(|id| {
681                let entry = store.entries.get(&id)?;
682                if entry.cancel_state.cancelled.swap(true, Ordering::AcqRel) {
683                    return None;
684                }
685                entry.cancel_state.timed_out.store(false, Ordering::Release);
686                Some((entry.killer.clone(), entry.cancel_state.clone()))
687            })
688            .collect()
689    };
690    for (killer, cancel_state) in to_kill {
691        do_kill(killer, cancel_state);
692    }
693}
694
695/// Set the cancellation flag and kill the process. Used by both `cancel_handle`
696/// and `cancel_session_handles`.
697fn do_kill(killer: Arc<dyn ProcessKiller>, cancel_state: Arc<CancelState>) {
698    // Kill via the handle's killer (works whether or not we still own
699    // the handle). The waiter owns process reaping and artifact finalization.
700    killer.kill();
701    cancel_state.cancelled.store(true, Ordering::Release);
702}
703
704/// Register the session-cleanup hook with harn-vm. Uses a `OnceLock` so the
705/// hook is registered exactly once even if `register_builtins` is called
706/// multiple times (e.g. in tests).
707pub(crate) fn register_cleanup_hook() {
708    static REGISTERED: OnceLock<()> = OnceLock::new();
709    REGISTERED.get_or_init(|| {
710        let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
711            cancel_session_handles(session_id);
712        });
713        harn_vm::register_session_end_hook(hook);
714    });
715}
716
717fn decode_exit_status(status: process_handle::ExitStatus) -> (i32, Option<String>) {
718    if let Some(code) = status.code {
719        return (code, None);
720    }
721    if let Some(sig) = status.signal {
722        return (-1, Some(format!("SIG{sig}")));
723    }
724    (-1, None)
725}
726
727/// Register a completion notifier for `handle_id`. The waiter thread sends
728/// `()` on the returned receiver after it pushes the feedback item to the
729/// global queue. Returns `None` if the handle is no longer in the store
730/// (e.g. already cancelled or completed). Used by tests to await waiter
731/// completion deterministically — no polling, no `thread::sleep`.
732pub fn register_completion_notifier(handle_id: &str) -> Option<std::sync::mpsc::Receiver<()>> {
733    let (tx, rx) = std::sync::mpsc::sync_channel::<()>(1);
734    let mut store = HANDLE_STORE
735        .lock()
736        .expect("long-running handle store poisoned");
737    let entry = store.entries.get_mut(handle_id)?;
738    entry.completion_tx = Some(tx);
739    Some(rx)
740}