Skip to main content

harn_hostlib/tools/
long_running.rs

1//! Long-running tool handle machinery.
2//!
3//! When a caller passes `long_running: true` to `run_command`, `run_test`, or
4//! `run_build_command`, the builtin spawns the child process without waiting,
5//! registers it here, and returns a handle dict immediately:
6//!
7//! ```json
8//! { "handle_id": "hto-<pid-hex>-<n>", "started_at": <unix_ms>, "command": "..." }
9//! ```
10//!
11//! A background thread waits for the child and, when it exits, calls
12//! `harn_vm::push_pending_feedback_global(session_id, "tool_result", json)`
13//! so the agent-loop's next turn-preflight picks it up.
14//!
15//! ### Cancellation
16//!
17//! `cancel_handle(handle_id)` kills the spawned process (SIGKILL) within
18//! 2 seconds. The session-end hook registered on startup kills every
19//! in-flight handle associated with the ending session.
20//!
21//! #### PID-based signaling
22//!
23//! The waiter thread takes ownership of the `Child` object to drain
24//! stdout/stderr and call `wait()`. To keep cancellation possible even
25//! after the waiter has taken the `Child`, we store the raw OS process ID
26//! in the entry and kill by PID when needed. On Unix we call `kill(2)`
27//! directly via an `extern "C"` declaration (no `libc` crate required).
28//! A shared `cancelled` flag suppresses the feedback push when the waiter
29//! sees an exit caused by cancellation.
30
31use std::collections::BTreeMap;
32use std::path::PathBuf;
33use std::process::{Child, Stdio};
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35use std::sync::{Arc, LazyLock, Mutex, OnceLock};
36use std::time::Duration;
37
38use harn_vm::VmValue;
39
40use harn_vm::process_sandbox;
41
42use crate::error::HostlibError;
43use crate::tools::proc::{self, CaptureConfig, CommandStatus, EnvMode};
44
45/// Atomic counter for generating unique handle IDs within this process.
46static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
47
48/// Shared cancellation state between the store entry and its waiter thread.
49struct CancelState {
50    /// Set to `true` when `cancel_handle` / `cancel_session_handles` runs.
51    /// The waiter checks this before pushing feedback.
52    cancelled: AtomicBool,
53}
54
55/// Shared state for a single in-flight child process.
56struct HandleEntry {
57    /// The child process. `None` after the waiter thread takes ownership.
58    child: Option<Child>,
59    /// Raw OS process ID — available even after the waiter took `child`.
60    pid: u32,
61    session_id: String,
62    /// Shared with the waiter thread.
63    cancel_state: Arc<CancelState>,
64}
65
66#[derive(Default)]
67struct HandleStore {
68    entries: BTreeMap<String, HandleEntry>,
69}
70
71static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
72    LazyLock::new(|| Mutex::new(HandleStore::default()));
73
74/// Metadata returned to the caller immediately when a long-running spawn
75/// succeeds. Serialised as a response dict by the calling builtin.
76pub struct LongRunningHandleInfo {
77    /// Command identifier shared with foreground command responses.
78    pub command_id: String,
79    /// Opaque handle identifier, e.g. `"hto-<pid-hex>-<n>"`.
80    pub handle_id: String,
81    /// RFC 3339 timestamp of the spawn.
82    pub started_at: String,
83    /// Raw child process id reported by the platform.
84    pub pid: u32,
85    /// Child process group id when the platform exposes it.
86    pub process_group_id: Option<u32>,
87    /// Human-readable display form of the argv (space-joined).
88    pub command_display: String,
89}
90
91impl LongRunningHandleInfo {
92    /// Convert into the standard handle response dict returned to the agent.
93    pub fn into_handle_response(self) -> VmValue {
94        proc::running_response(
95            self.command_id,
96            self.handle_id,
97            self.pid,
98            self.process_group_id,
99            self.started_at,
100            self.command_display,
101        )
102    }
103}
104
105/// Spawn the argv as a long-running child process and return a handle.
106///
107/// The background waiter calls `push_pending_feedback_global` when the
108/// process exits so the next agent-loop turn sees the result.
109pub fn spawn_long_running(
110    builtin: &'static str,
111    program: String,
112    args: Vec<String>,
113    cwd: Option<PathBuf>,
114    env: BTreeMap<String, String>,
115    session_id: String,
116) -> Result<LongRunningHandleInfo, HostlibError> {
117    spawn_long_running_with_options(
118        builtin,
119        program,
120        args,
121        cwd,
122        env,
123        EnvMode::InheritClean,
124        CaptureConfig::default(),
125        session_id,
126    )
127}
128
129pub(crate) fn spawn_long_running_with_options(
130    builtin: &'static str,
131    program: String,
132    args: Vec<String>,
133    cwd: Option<PathBuf>,
134    env: BTreeMap<String, String>,
135    env_mode: EnvMode,
136    capture: CaptureConfig,
137    session_id: String,
138) -> Result<LongRunningHandleInfo, HostlibError> {
139    if program.is_empty() {
140        return Err(HostlibError::InvalidParameter {
141            builtin,
142            param: "argv",
143            message: "first element of argv must be a non-empty program name".to_string(),
144        });
145    }
146
147    let mut command =
148        process_sandbox::std_command_for(&program, &args).map_err(|e| HostlibError::Backend {
149            builtin,
150            message: format!("sandbox setup failed: {e:?}"),
151        })?;
152
153    if let Some(cwd_path) = cwd.as_ref() {
154        process_sandbox::enforce_process_cwd(cwd_path).map_err(|e| HostlibError::Backend {
155            builtin,
156            message: format!("sandbox cwd rejected: {e:?}"),
157        })?;
158        command.current_dir(cwd_path);
159    }
160
161    proc::configure_background_process_group(&mut command);
162
163    if matches!(env_mode, EnvMode::Replace) {
164        command.env_clear();
165    }
166    if !env.is_empty() {
167        for (key, value) in &env {
168            command.env(key, value);
169        }
170    }
171
172    command.stdout(Stdio::piped());
173    command.stderr(Stdio::piped());
174    command.stdin(Stdio::null());
175
176    let child = command.spawn().map_err(|e| {
177        if let Some(violation) = process_sandbox::process_spawn_error(&e) {
178            return HostlibError::Backend {
179                builtin,
180                message: format!("sandbox rejected spawn: {violation:?}"),
181            };
182        }
183        HostlibError::Backend {
184            builtin,
185            message: format!("spawn failed: {e}"),
186        }
187    })?;
188
189    let pid = child.id();
190    let process_group_id = proc::child_process_group_id(pid);
191    let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
192    let handle_id = format!("hto-{:x}-{id}", std::process::id());
193    let command_id = proc::next_command_id();
194    let started_at = proc::now_rfc3339();
195
196    let mut all_argv = vec![program.clone()];
197    all_argv.extend(args.iter().cloned());
198    let command_display = all_argv.join(" ");
199
200    let cancel_state = Arc::new(CancelState {
201        cancelled: AtomicBool::new(false),
202    });
203
204    {
205        let mut store = HANDLE_STORE
206            .lock()
207            .expect("long-running handle store poisoned");
208        store.entries.insert(
209            handle_id.clone(),
210            HandleEntry {
211                child: Some(child),
212                pid,
213                session_id: session_id.clone(),
214                cancel_state: cancel_state.clone(),
215            },
216        );
217    }
218
219    let waiter_command_id = command_id.clone();
220    let waiter_handle_id = handle_id.clone();
221    let waiter_session_id = session_id;
222    let waiter_started_at = started_at.clone();
223    std::thread::Builder::new()
224        .name(format!("hto-waiter-{waiter_handle_id}"))
225        .spawn(move || {
226            waiter_thread(
227                waiter_command_id,
228                waiter_handle_id,
229                waiter_session_id,
230                cancel_state,
231                capture,
232                waiter_started_at,
233                process_group_id,
234            );
235        })
236        .map_err(|e| HostlibError::Backend {
237            builtin,
238            message: format!("failed to spawn waiter thread: {e}"),
239        })?;
240
241    Ok(LongRunningHandleInfo {
242        command_id,
243        handle_id,
244        started_at,
245        pid,
246        process_group_id,
247        command_display,
248    })
249}
250
251/// Background thread that waits for a child process and fires feedback.
252fn waiter_thread(
253    command_id: String,
254    handle_id: String,
255    session_id: String,
256    cancel_state: Arc<CancelState>,
257    capture: CaptureConfig,
258    started_at: String,
259    process_group_id: Option<u32>,
260) {
261    let waiter_start = std::time::Instant::now();
262
263    // Take the child out of the store. If the entry is already gone (i.e.
264    // cancel_handle ran and removed it before us), exit without action.
265    let mut child = {
266        let mut store = HANDLE_STORE
267            .lock()
268            .expect("long-running handle store poisoned");
269        match store.entries.get_mut(&handle_id) {
270            Some(entry) => match entry.child.take() {
271                Some(c) => c,
272                None => return, // already cancelled before we ran
273            },
274            None => return, // entry removed (cancelled before store insert — shouldn't happen)
275        }
276    };
277
278    // Drain stdout/stderr on separate threads to prevent pipe deadlock.
279    use std::io::Read;
280    let mut stdout_bytes = Vec::new();
281    let mut stderr_bytes = Vec::new();
282    let (out_tx, out_rx) = std::sync::mpsc::channel::<Vec<u8>>();
283    let (err_tx, err_rx) = std::sync::mpsc::channel::<Vec<u8>>();
284
285    if let Some(mut out) = child.stdout.take() {
286        std::thread::spawn(move || {
287            let _ = out.read_to_end(&mut stdout_bytes);
288            let _ = out_tx.send(stdout_bytes);
289        });
290    }
291    if let Some(mut err) = child.stderr.take() {
292        std::thread::spawn(move || {
293            let _ = err.read_to_end(&mut stderr_bytes);
294            let _ = err_tx.send(stderr_bytes);
295        });
296    }
297
298    let status = child.wait().ok();
299
300    let stdout = out_rx
301        .recv_timeout(Duration::from_secs(5))
302        .unwrap_or_default();
303    let stderr = err_rx
304        .recv_timeout(Duration::from_secs(5))
305        .unwrap_or_default();
306
307    // Remove our entry from the store.
308    {
309        let mut store = HANDLE_STORE
310            .lock()
311            .expect("long-running handle store poisoned");
312        store.entries.remove(&handle_id);
313    }
314
315    // If cancellation was requested, don't push feedback — the caller
316    // that cancelled doesn't want to receive a spurious tool_result.
317    if cancel_state.cancelled.load(Ordering::Acquire) {
318        return;
319    }
320
321    let (exit_code, signal_name) = match status {
322        Some(s) => decode_exit_status(s),
323        // wait() itself failed — treat as killed (extremely unusual).
324        None => (-1, Some("SIGKILL".to_string())),
325    };
326    let duration = waiter_start.elapsed();
327    let duration_ms = duration.as_millis() as i64;
328    let artifacts = match proc::persist_artifacts(&command_id, &stdout, &stderr, Some(&handle_id)) {
329        Ok(artifacts) => artifacts,
330        Err(_) => return,
331    };
332    let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
333
334    let mut payload = serde_json::Map::new();
335    payload.insert(
336        "command_id".into(),
337        serde_json::Value::String(command_id.clone()),
338    );
339    payload.insert(
340        "status".into(),
341        serde_json::Value::String(CommandStatus::Completed.as_str().to_string()),
342    );
343    payload.insert("handle_id".into(), serde_json::Value::String(handle_id));
344    payload.insert("started_at".into(), serde_json::Value::String(started_at));
345    payload.insert(
346        "ended_at".into(),
347        serde_json::Value::String(proc::now_rfc3339()),
348    );
349    payload.insert(
350        "duration_ms".into(),
351        serde_json::Value::Number(duration_ms.into()),
352    );
353    payload.insert(
354        "exit_code".into(),
355        serde_json::Value::Number(exit_code.into()),
356    );
357    payload.insert("stdout".into(), serde_json::Value::String(inline_stdout));
358    payload.insert("stderr".into(), serde_json::Value::String(inline_stderr));
359    payload.insert(
360        "output_path".into(),
361        serde_json::Value::String(artifacts.output_path.display().to_string()),
362    );
363    payload.insert(
364        "stdout_path".into(),
365        serde_json::Value::String(artifacts.stdout_path.display().to_string()),
366    );
367    payload.insert(
368        "stderr_path".into(),
369        serde_json::Value::String(artifacts.stderr_path.display().to_string()),
370    );
371    payload.insert(
372        "line_count".into(),
373        serde_json::Value::Number(artifacts.line_count.into()),
374    );
375    payload.insert(
376        "byte_count".into(),
377        serde_json::Value::Number(artifacts.byte_count.into()),
378    );
379    payload.insert(
380        "output_sha256".into(),
381        serde_json::Value::String(artifacts.output_sha256),
382    );
383    if let Some(pgid) = process_group_id {
384        payload.insert(
385            "process_group_id".into(),
386            serde_json::Value::Number((pgid as u64).into()),
387        );
388    }
389    if let Some(sig) = signal_name {
390        payload.insert("signal".into(), serde_json::Value::String(sig));
391    } else {
392        payload.insert("signal".into(), serde_json::Value::Null);
393    }
394
395    let content = serde_json::to_string(&payload).unwrap_or_default();
396    harn_vm::push_pending_feedback_global(&session_id, "tool_result", &content);
397}
398
399/// Cancel a specific in-flight long-running handle. Kills the process and
400/// removes the entry. Returns `true` if the handle was found and cancelled.
401pub fn cancel_handle(handle_id: &str) -> bool {
402    let (pid, child, cancel_state) = {
403        let mut store = HANDLE_STORE
404            .lock()
405            .expect("long-running handle store poisoned");
406        match store.entries.remove(handle_id) {
407            None => return false,
408            Some(mut entry) => (entry.pid, entry.child.take(), entry.cancel_state.clone()),
409        }
410    };
411    do_kill(pid, child, cancel_state);
412    true
413}
414
415/// Cancel all in-flight handles for a given session. Called by the
416/// session-end hook to avoid orphaned processes.
417pub fn cancel_session_handles(session_id: &str) {
418    let to_kill: Vec<(u32, Option<Child>, Arc<CancelState>)> = {
419        let mut store = HANDLE_STORE
420            .lock()
421            .expect("long-running handle store poisoned");
422        let matching: Vec<String> = store
423            .entries
424            .iter()
425            .filter(|(_, e)| e.session_id == session_id)
426            .map(|(id, _)| id.clone())
427            .collect();
428        matching
429            .into_iter()
430            .filter_map(|id| {
431                store.entries.remove(&id).map(|mut e| {
432                    let child = e.child.take();
433                    (e.pid, child, e.cancel_state.clone())
434                })
435            })
436            .collect()
437    };
438    for (pid, child, cancel_state) in to_kill {
439        do_kill(pid, child, cancel_state);
440    }
441}
442
443/// Set the cancellation flag and kill the process. Used by both `cancel_handle`
444/// and `cancel_session_handles`.
445fn do_kill(pid: u32, child: Option<Child>, cancel_state: Arc<CancelState>) {
446    // Signal cancellation so the waiter (if still running) skips feedback.
447    cancel_state.cancelled.store(true, Ordering::Release);
448    if let Some(mut c) = child {
449        // Waiter hasn't taken the child yet — kill it directly.
450        kill_child(&mut c);
451    } else {
452        // Waiter already took the child; signal by PID.
453        kill_pid_or_group(pid);
454    }
455}
456
457/// Register the session-cleanup hook with harn-vm. Uses a `OnceLock` so the
458/// hook is registered exactly once even if `register_builtins` is called
459/// multiple times (e.g. in tests).
460pub(crate) fn register_cleanup_hook() {
461    static REGISTERED: OnceLock<()> = OnceLock::new();
462    REGISTERED.get_or_init(|| {
463        let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
464            cancel_session_handles(session_id);
465        });
466        harn_vm::register_session_end_hook(hook);
467    });
468}
469
470fn kill_child(child: &mut Child) {
471    kill_pid_or_group(child.id());
472    let _ = child.kill();
473    let _ = child.wait();
474}
475
476/// Kill a process by its PID. Used when the waiter thread has already taken
477/// ownership of the `Child` object but the process must still be terminated.
478fn kill_pid_or_group(pid: u32) {
479    #[cfg(unix)]
480    {
481        // SAFETY: We call kill(2) with a valid PID and SIGKILL (9). On all
482        // Unix targets pid_t and int are i32. No libc crate needed.
483        extern "C" {
484            fn kill(pid: i32, sig: i32) -> i32;
485        }
486        unsafe {
487            kill(-(pid as i32), 9); // SIGKILL process group first.
488            kill(pid as i32, 9);
489        }
490    }
491    #[cfg(not(unix))]
492    {
493        let _ = pid; // No-op on non-Unix; TerminateProcess would require winapi.
494    }
495}
496
497fn decode_exit_status(status: std::process::ExitStatus) -> (i32, Option<String>) {
498    #[cfg(unix)]
499    {
500        use std::os::unix::process::ExitStatusExt;
501        if let Some(code) = status.code() {
502            return (code, None);
503        }
504        if let Some(sig) = status.signal() {
505            return (-1, Some(format!("SIG{sig}")));
506        }
507        (-1, None)
508    }
509    #[cfg(not(unix))]
510    (status.code().unwrap_or(-1), None)
511}