Skip to main content

harn_hostlib/tools/
long_running.rs

1//! Long-running tool handle machinery.
2//!
3//! When a caller passes `long_running: true` to `run_command`, `run_test`, or
4//! `run_build_command`, the builtin spawns the child process without waiting,
5//! registers it here, and returns a handle dict immediately:
6//!
7//! ```json
8//! {
9//!   "handle_id": "hto-<pid-hex>-<n>",
10//!   "started_at": "...",
11//!   "command_or_op_descriptor": "..."
12//! }
13//! ```
14//!
15//! A background thread waits for the child and, when it exits, calls
16//! `harn_vm::push_pending_feedback_global(session_id, "tool_result", json)`
17//! so the agent-loop's next turn-preflight picks it up.
18//!
19//! ### Cancellation
20//!
21//! `cancel_handle(handle_id)` kills the spawned process (SIGKILL) within
22//! 2 seconds. The session-end hook registered on startup kills every
23//! in-flight handle associated with the ending session.
24//!
25//! #### PID-based signaling
26//!
27//! The waiter thread takes ownership of the `Child` object to drain
28//! stdout/stderr and call `wait()`. To keep cancellation possible even
29//! after the waiter has taken the `Child`, we store the raw OS process ID
30//! in the entry and kill by PID when needed. On Unix we call `kill(2)`
31//! directly via an `extern "C"` declaration (no `libc` crate required).
32//! A shared `cancelled` flag suppresses the feedback push when the waiter
33//! sees an exit caused by cancellation.
34
35use std::collections::BTreeMap;
36use std::path::PathBuf;
37use std::process::{Child, Stdio};
38use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
39use std::sync::{Arc, LazyLock, Mutex, OnceLock};
40use std::time::Duration;
41
42use harn_vm::VmValue;
43
44use harn_vm::process_sandbox;
45
46use crate::error::HostlibError;
47use crate::tools::proc::{self, CaptureConfig, CommandStatus, EnvMode};
48
49/// Atomic counter for generating unique handle IDs within this process.
50static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
51
52/// Shared cancellation state between the store entry and its waiter thread.
53struct CancelState {
54    /// Set to `true` when `cancel_handle` / `cancel_session_handles` runs.
55    /// The waiter checks this before pushing feedback.
56    cancelled: AtomicBool,
57}
58
59/// Shared state for a single in-flight child process.
60struct HandleEntry {
61    /// The child process. `None` after the waiter thread takes ownership.
62    child: Option<Child>,
63    /// Raw OS process ID — available even after the waiter took `child`.
64    pid: u32,
65    session_id: String,
66    /// Shared with the waiter thread.
67    cancel_state: Arc<CancelState>,
68}
69
70#[derive(Default)]
71struct HandleStore {
72    entries: BTreeMap<String, HandleEntry>,
73}
74
75static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
76    LazyLock::new(|| Mutex::new(HandleStore::default()));
77
78/// Metadata returned to the caller immediately when a long-running spawn
79/// succeeds. Serialised as a response dict by the calling builtin.
80pub struct LongRunningHandleInfo {
81    /// Command identifier shared with foreground command responses.
82    pub command_id: String,
83    /// Opaque handle identifier, e.g. `"hto-<pid-hex>-<n>"`.
84    pub handle_id: String,
85    /// RFC 3339 timestamp of the spawn.
86    pub started_at: String,
87    /// Raw child process id reported by the platform.
88    pub pid: u32,
89    /// Child process group id when the platform exposes it.
90    pub process_group_id: Option<u32>,
91    /// Human-readable display form of the argv (space-joined).
92    pub command_display: String,
93}
94
95impl LongRunningHandleInfo {
96    /// Convert into the standard handle response dict returned to the agent.
97    pub fn into_handle_response(self) -> VmValue {
98        proc::running_response(
99            self.command_id,
100            self.handle_id,
101            self.pid,
102            self.process_group_id,
103            self.started_at,
104            self.command_display,
105        )
106    }
107}
108
109/// Spawn the argv as a long-running child process and return a handle.
110///
111/// The background waiter calls `push_pending_feedback_global` when the
112/// process exits so the next agent-loop turn sees the result.
113pub fn spawn_long_running(
114    builtin: &'static str,
115    program: String,
116    args: Vec<String>,
117    cwd: Option<PathBuf>,
118    env: BTreeMap<String, String>,
119    session_id: String,
120) -> Result<LongRunningHandleInfo, HostlibError> {
121    spawn_long_running_with_options(
122        builtin,
123        program,
124        args,
125        cwd,
126        env,
127        EnvMode::InheritClean,
128        CaptureConfig::default(),
129        session_id,
130    )
131}
132
133pub(crate) fn spawn_long_running_with_options(
134    builtin: &'static str,
135    program: String,
136    args: Vec<String>,
137    cwd: Option<PathBuf>,
138    env: BTreeMap<String, String>,
139    env_mode: EnvMode,
140    capture: CaptureConfig,
141    session_id: String,
142) -> Result<LongRunningHandleInfo, HostlibError> {
143    if program.is_empty() {
144        return Err(HostlibError::InvalidParameter {
145            builtin,
146            param: "argv",
147            message: "first element of argv must be a non-empty program name".to_string(),
148        });
149    }
150
151    let mut command =
152        process_sandbox::std_command_for(&program, &args).map_err(|e| HostlibError::Backend {
153            builtin,
154            message: format!("sandbox setup failed: {e:?}"),
155        })?;
156
157    if let Some(cwd_path) = cwd.as_ref() {
158        process_sandbox::enforce_process_cwd(cwd_path).map_err(|e| HostlibError::Backend {
159            builtin,
160            message: format!("sandbox cwd rejected: {e:?}"),
161        })?;
162        command.current_dir(cwd_path);
163    }
164
165    proc::configure_background_process_group(&mut command);
166
167    if matches!(env_mode, EnvMode::Replace) {
168        command.env_clear();
169    }
170    if !env.is_empty() {
171        for (key, value) in &env {
172            command.env(key, value);
173        }
174    }
175
176    command.stdout(Stdio::piped());
177    command.stderr(Stdio::piped());
178    command.stdin(Stdio::null());
179
180    let child = command.spawn().map_err(|e| {
181        if let Some(violation) = process_sandbox::process_spawn_error(&e) {
182            return HostlibError::Backend {
183                builtin,
184                message: format!("sandbox rejected spawn: {violation:?}"),
185            };
186        }
187        HostlibError::Backend {
188            builtin,
189            message: format!("spawn failed: {e}"),
190        }
191    })?;
192
193    let pid = child.id();
194    let process_group_id = proc::child_process_group_id(pid);
195    let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
196    let handle_id = format!("hto-{:x}-{id}", std::process::id());
197    let command_id = proc::next_command_id();
198    let started_at = proc::now_rfc3339();
199
200    let mut all_argv = vec![program.clone()];
201    all_argv.extend(args.iter().cloned());
202    let command_display = all_argv.join(" ");
203
204    let cancel_state = Arc::new(CancelState {
205        cancelled: AtomicBool::new(false),
206    });
207
208    {
209        let mut store = HANDLE_STORE
210            .lock()
211            .expect("long-running handle store poisoned");
212        store.entries.insert(
213            handle_id.clone(),
214            HandleEntry {
215                child: Some(child),
216                pid,
217                session_id: session_id.clone(),
218                cancel_state: cancel_state.clone(),
219            },
220        );
221    }
222
223    let waiter_command_id = command_id.clone();
224    let waiter_handle_id = handle_id.clone();
225    let waiter_session_id = session_id;
226    let waiter_started_at = started_at.clone();
227    let waiter_command_display = command_display.clone();
228    std::thread::Builder::new()
229        .name(format!("hto-waiter-{waiter_handle_id}"))
230        .spawn(move || {
231            waiter_thread(
232                waiter_command_id,
233                waiter_handle_id,
234                waiter_session_id,
235                cancel_state,
236                capture,
237                waiter_started_at,
238                process_group_id,
239                waiter_command_display,
240            );
241        })
242        .map_err(|e| HostlibError::Backend {
243            builtin,
244            message: format!("failed to spawn waiter thread: {e}"),
245        })?;
246
247    Ok(LongRunningHandleInfo {
248        command_id,
249        handle_id,
250        started_at,
251        pid,
252        process_group_id,
253        command_display,
254    })
255}
256
257/// Background thread that waits for a child process and fires feedback.
258fn waiter_thread(
259    command_id: String,
260    handle_id: String,
261    session_id: String,
262    cancel_state: Arc<CancelState>,
263    capture: CaptureConfig,
264    started_at: String,
265    process_group_id: Option<u32>,
266    command_display: String,
267) {
268    let waiter_start = std::time::Instant::now();
269
270    // Take the child out of the store. If the entry is already gone (i.e.
271    // cancel_handle ran and removed it before us), exit without action.
272    let mut child = {
273        let mut store = HANDLE_STORE
274            .lock()
275            .expect("long-running handle store poisoned");
276        match store.entries.get_mut(&handle_id) {
277            Some(entry) => match entry.child.take() {
278                Some(c) => c,
279                None => return, // already cancelled before we ran
280            },
281            None => return, // entry removed (cancelled before store insert — shouldn't happen)
282        }
283    };
284
285    // Drain stdout/stderr on separate threads to prevent pipe deadlock.
286    use std::io::Read;
287    let mut stdout_bytes = Vec::new();
288    let mut stderr_bytes = Vec::new();
289    let (out_tx, out_rx) = std::sync::mpsc::channel::<Vec<u8>>();
290    let (err_tx, err_rx) = std::sync::mpsc::channel::<Vec<u8>>();
291
292    if let Some(mut out) = child.stdout.take() {
293        std::thread::spawn(move || {
294            let _ = out.read_to_end(&mut stdout_bytes);
295            let _ = out_tx.send(stdout_bytes);
296        });
297    }
298    if let Some(mut err) = child.stderr.take() {
299        std::thread::spawn(move || {
300            let _ = err.read_to_end(&mut stderr_bytes);
301            let _ = err_tx.send(stderr_bytes);
302        });
303    }
304
305    let status = child.wait().ok();
306
307    let stdout = out_rx
308        .recv_timeout(Duration::from_secs(5))
309        .unwrap_or_default();
310    let stderr = err_rx
311        .recv_timeout(Duration::from_secs(5))
312        .unwrap_or_default();
313
314    // Remove our entry from the store.
315    {
316        let mut store = HANDLE_STORE
317            .lock()
318            .expect("long-running handle store poisoned");
319        store.entries.remove(&handle_id);
320    }
321
322    // If cancellation was requested, don't push feedback — the caller
323    // that cancelled doesn't want to receive a spurious tool_result.
324    if cancel_state.cancelled.load(Ordering::Acquire) {
325        return;
326    }
327
328    let (exit_code, signal_name) = match status {
329        Some(s) => decode_exit_status(s),
330        // wait() itself failed — treat as killed (extremely unusual).
331        None => (-1, Some("SIGKILL".to_string())),
332    };
333    let duration = waiter_start.elapsed();
334    let duration_ms = duration.as_millis() as i64;
335    let artifacts = match proc::persist_artifacts(&command_id, &stdout, &stderr, Some(&handle_id)) {
336        Ok(artifacts) => artifacts,
337        Err(_) => return,
338    };
339    let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
340
341    let mut payload = serde_json::Map::new();
342    payload.insert(
343        "command_id".into(),
344        serde_json::Value::String(command_id.clone()),
345    );
346    payload.insert(
347        "status".into(),
348        serde_json::Value::String(CommandStatus::Completed.as_str().to_string()),
349    );
350    payload.insert("handle_id".into(), serde_json::Value::String(handle_id));
351    payload.insert(
352        "command_or_op_descriptor".into(),
353        serde_json::Value::String(command_display),
354    );
355    payload.insert("started_at".into(), serde_json::Value::String(started_at));
356    payload.insert(
357        "ended_at".into(),
358        serde_json::Value::String(proc::now_rfc3339()),
359    );
360    payload.insert(
361        "duration_ms".into(),
362        serde_json::Value::Number(duration_ms.into()),
363    );
364    payload.insert(
365        "exit_code".into(),
366        serde_json::Value::Number(exit_code.into()),
367    );
368    payload.insert("stdout".into(), serde_json::Value::String(inline_stdout));
369    payload.insert("stderr".into(), serde_json::Value::String(inline_stderr));
370    payload.insert(
371        "output_path".into(),
372        serde_json::Value::String(artifacts.output_path.display().to_string()),
373    );
374    payload.insert(
375        "stdout_path".into(),
376        serde_json::Value::String(artifacts.stdout_path.display().to_string()),
377    );
378    payload.insert(
379        "stderr_path".into(),
380        serde_json::Value::String(artifacts.stderr_path.display().to_string()),
381    );
382    payload.insert(
383        "line_count".into(),
384        serde_json::Value::Number(artifacts.line_count.into()),
385    );
386    payload.insert(
387        "byte_count".into(),
388        serde_json::Value::Number(artifacts.byte_count.into()),
389    );
390    payload.insert(
391        "output_sha256".into(),
392        serde_json::Value::String(artifacts.output_sha256),
393    );
394    if let Some(pgid) = process_group_id {
395        payload.insert(
396            "process_group_id".into(),
397            serde_json::Value::Number((pgid as u64).into()),
398        );
399    }
400    if let Some(sig) = signal_name {
401        payload.insert("signal".into(), serde_json::Value::String(sig));
402    } else {
403        payload.insert("signal".into(), serde_json::Value::Null);
404    }
405
406    let content = serde_json::to_string(&payload).unwrap_or_default();
407    harn_vm::push_pending_feedback_global(&session_id, "tool_result", &content);
408}
409
410/// Cancel a specific in-flight long-running handle. Kills the process and
411/// removes the entry. Returns `true` if the handle was found and cancelled.
412pub fn cancel_handle(handle_id: &str) -> bool {
413    let (pid, child, cancel_state) = {
414        let mut store = HANDLE_STORE
415            .lock()
416            .expect("long-running handle store poisoned");
417        match store.entries.remove(handle_id) {
418            None => return false,
419            Some(mut entry) => (entry.pid, entry.child.take(), entry.cancel_state.clone()),
420        }
421    };
422    do_kill(pid, child, cancel_state);
423    true
424}
425
426/// Cancel all in-flight handles for a given session. Called by the
427/// session-end hook to avoid orphaned processes.
428pub fn cancel_session_handles(session_id: &str) {
429    let to_kill: Vec<(u32, Option<Child>, Arc<CancelState>)> = {
430        let mut store = HANDLE_STORE
431            .lock()
432            .expect("long-running handle store poisoned");
433        let matching: Vec<String> = store
434            .entries
435            .iter()
436            .filter(|(_, e)| e.session_id == session_id)
437            .map(|(id, _)| id.clone())
438            .collect();
439        matching
440            .into_iter()
441            .filter_map(|id| {
442                store.entries.remove(&id).map(|mut e| {
443                    let child = e.child.take();
444                    (e.pid, child, e.cancel_state.clone())
445                })
446            })
447            .collect()
448    };
449    for (pid, child, cancel_state) in to_kill {
450        do_kill(pid, child, cancel_state);
451    }
452}
453
454/// Set the cancellation flag and kill the process. Used by both `cancel_handle`
455/// and `cancel_session_handles`.
456fn do_kill(pid: u32, child: Option<Child>, cancel_state: Arc<CancelState>) {
457    // Signal cancellation so the waiter (if still running) skips feedback.
458    cancel_state.cancelled.store(true, Ordering::Release);
459    if let Some(mut c) = child {
460        // Waiter hasn't taken the child yet — kill it directly.
461        kill_child(&mut c);
462    } else {
463        // Waiter already took the child; signal by PID.
464        kill_pid_or_group(pid);
465    }
466}
467
468/// Register the session-cleanup hook with harn-vm. Uses a `OnceLock` so the
469/// hook is registered exactly once even if `register_builtins` is called
470/// multiple times (e.g. in tests).
471pub(crate) fn register_cleanup_hook() {
472    static REGISTERED: OnceLock<()> = OnceLock::new();
473    REGISTERED.get_or_init(|| {
474        let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
475            cancel_session_handles(session_id);
476        });
477        harn_vm::register_session_end_hook(hook);
478    });
479}
480
481fn kill_child(child: &mut Child) {
482    kill_pid_or_group(child.id());
483    let _ = child.kill();
484    let _ = child.wait();
485}
486
487/// Kill a process by its PID. Used when the waiter thread has already taken
488/// ownership of the `Child` object but the process must still be terminated.
489fn kill_pid_or_group(pid: u32) {
490    #[cfg(unix)]
491    {
492        // SAFETY: We call kill(2) with a valid PID and SIGKILL (9). On all
493        // Unix targets pid_t and int are i32. No libc crate needed.
494        extern "C" {
495            fn kill(pid: i32, sig: i32) -> i32;
496        }
497        unsafe {
498            kill(-(pid as i32), 9); // SIGKILL process group first.
499            kill(pid as i32, 9);
500        }
501    }
502    #[cfg(not(unix))]
503    {
504        let _ = pid; // No-op on non-Unix; TerminateProcess would require winapi.
505    }
506}
507
508fn decode_exit_status(status: std::process::ExitStatus) -> (i32, Option<String>) {
509    #[cfg(unix)]
510    {
511        use std::os::unix::process::ExitStatusExt;
512        if let Some(code) = status.code() {
513            return (code, None);
514        }
515        if let Some(sig) = status.signal() {
516            return (-1, Some(format!("SIG{sig}")));
517        }
518        (-1, None)
519    }
520    #[cfg(not(unix))]
521    (status.code().unwrap_or(-1), None)
522}