Skip to main content

harn_hostlib/tools/
long_running.rs

1//! Long-running tool handle machinery.
2//!
3//! When a caller passes `long_running: true` to `run_command`, `run_test`, or
4//! `run_build_command`, the builtin spawns the child process without waiting,
5//! registers it here, and returns a handle dict immediately:
6//!
7//! ```json
8//! { "handle_id": "hto-<pid-hex>-<n>", "started_at": <unix_ms>, "command": "..." }
9//! ```
10//!
11//! A background thread waits for the child and, when it exits, calls
12//! `harn_vm::push_pending_feedback_global(session_id, "tool_result", json)`
13//! so the agent-loop's next turn-preflight picks it up.
14//!
15//! ### Cancellation
16//!
17//! `cancel_handle(handle_id)` kills the spawned process (SIGKILL) within
18//! 2 seconds. The session-end hook registered on startup kills every
19//! in-flight handle associated with the ending session.
20//!
21//! #### PID-based signaling
22//!
23//! The waiter thread takes ownership of the `Child` object to drain
24//! stdout/stderr and call `wait()`. To keep cancellation possible even
25//! after the waiter has taken the `Child`, we store the raw OS process ID
26//! in the entry and kill by PID when needed. On Unix we call `kill(2)`
27//! directly via an `extern "C"` declaration (no `libc` crate required).
28//! A shared `cancelled` flag suppresses the feedback push when the waiter
29//! sees an exit caused by cancellation.
30
31use std::collections::BTreeMap;
32use std::path::PathBuf;
33use std::process::{Child, Stdio};
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35use std::sync::{Arc, LazyLock, Mutex, OnceLock};
36use std::time::{Duration, SystemTime, UNIX_EPOCH};
37
38use harn_vm::VmValue;
39
40use harn_vm::process_sandbox;
41
42use crate::error::HostlibError;
43
44/// Atomic counter for generating unique handle IDs within this process.
45static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
46
47/// Shared cancellation state between the store entry and its waiter thread.
48struct CancelState {
49    /// Set to `true` when `cancel_handle` / `cancel_session_handles` runs.
50    /// The waiter checks this before pushing feedback.
51    cancelled: AtomicBool,
52}
53
54/// Shared state for a single in-flight child process.
55struct HandleEntry {
56    /// The child process. `None` after the waiter thread takes ownership.
57    child: Option<Child>,
58    /// Raw OS process ID — available even after the waiter took `child`.
59    pid: u32,
60    session_id: String,
61    /// Shared with the waiter thread.
62    cancel_state: Arc<CancelState>,
63}
64
65#[derive(Default)]
66struct HandleStore {
67    entries: BTreeMap<String, HandleEntry>,
68}
69
70static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
71    LazyLock::new(|| Mutex::new(HandleStore::default()));
72
73/// Metadata returned to the caller immediately when a long-running spawn
74/// succeeds. Serialised as a response dict by the calling builtin.
75pub struct LongRunningHandleInfo {
76    /// Opaque handle identifier, e.g. `"hto-<pid-hex>-<n>"`.
77    pub handle_id: String,
78    /// Unix timestamp of the spawn, in milliseconds.
79    pub started_at_ms: u64,
80    /// Human-readable display form of the argv (space-joined).
81    pub command_display: String,
82}
83
84impl LongRunningHandleInfo {
85    /// Convert into the standard handle response dict returned to the agent.
86    pub fn into_handle_response(self) -> VmValue {
87        super::response::ResponseBuilder::new()
88            .str("handle_id", self.handle_id)
89            .int("started_at", self.started_at_ms as i64)
90            .str("command", self.command_display)
91            .build()
92    }
93}
94
95/// Spawn the argv as a long-running child process and return a handle.
96///
97/// The background waiter calls `push_pending_feedback_global` when the
98/// process exits so the next agent-loop turn sees the result.
99pub fn spawn_long_running(
100    builtin: &'static str,
101    program: String,
102    args: Vec<String>,
103    cwd: Option<PathBuf>,
104    env: BTreeMap<String, String>,
105    session_id: String,
106) -> Result<LongRunningHandleInfo, HostlibError> {
107    if program.is_empty() {
108        return Err(HostlibError::InvalidParameter {
109            builtin,
110            param: "argv",
111            message: "first element of argv must be a non-empty program name".to_string(),
112        });
113    }
114
115    let mut command =
116        process_sandbox::std_command_for(&program, &args).map_err(|e| HostlibError::Backend {
117            builtin,
118            message: format!("sandbox setup failed: {e:?}"),
119        })?;
120
121    if let Some(cwd_path) = cwd.as_ref() {
122        process_sandbox::enforce_process_cwd(cwd_path).map_err(|e| HostlibError::Backend {
123            builtin,
124            message: format!("sandbox cwd rejected: {e:?}"),
125        })?;
126        command.current_dir(cwd_path);
127    }
128
129    if !env.is_empty() {
130        command.env_clear();
131        for (key, value) in &env {
132            command.env(key, value);
133        }
134    }
135
136    command.stdout(Stdio::piped());
137    command.stderr(Stdio::piped());
138    command.stdin(Stdio::null());
139
140    let child = command.spawn().map_err(|e| {
141        if let Some(violation) = process_sandbox::process_spawn_error(&e) {
142            return HostlibError::Backend {
143                builtin,
144                message: format!("sandbox rejected spawn: {violation:?}"),
145            };
146        }
147        HostlibError::Backend {
148            builtin,
149            message: format!("spawn failed: {e}"),
150        }
151    })?;
152
153    let pid = child.id();
154    let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
155    let handle_id = format!("hto-{:x}-{id}", std::process::id());
156
157    let started_at_ms = SystemTime::now()
158        .duration_since(UNIX_EPOCH)
159        .map(|d| d.as_millis() as u64)
160        .unwrap_or(0);
161
162    let mut all_argv = vec![program.clone()];
163    all_argv.extend(args.iter().cloned());
164    let command_display = all_argv.join(" ");
165
166    let cancel_state = Arc::new(CancelState {
167        cancelled: AtomicBool::new(false),
168    });
169
170    {
171        let mut store = HANDLE_STORE
172            .lock()
173            .expect("long-running handle store poisoned");
174        store.entries.insert(
175            handle_id.clone(),
176            HandleEntry {
177                child: Some(child),
178                pid,
179                session_id: session_id.clone(),
180                cancel_state: cancel_state.clone(),
181            },
182        );
183    }
184
185    let waiter_handle_id = handle_id.clone();
186    let waiter_session_id = session_id;
187    std::thread::Builder::new()
188        .name(format!("hto-waiter-{waiter_handle_id}"))
189        .spawn(move || {
190            waiter_thread(waiter_handle_id, waiter_session_id, cancel_state);
191        })
192        .map_err(|e| HostlibError::Backend {
193            builtin,
194            message: format!("failed to spawn waiter thread: {e}"),
195        })?;
196
197    Ok(LongRunningHandleInfo {
198        handle_id,
199        started_at_ms,
200        command_display,
201    })
202}
203
204/// Background thread that waits for a child process and fires feedback.
205fn waiter_thread(handle_id: String, session_id: String, cancel_state: Arc<CancelState>) {
206    let waiter_start = std::time::Instant::now();
207
208    // Take the child out of the store. If the entry is already gone (i.e.
209    // cancel_handle ran and removed it before us), exit without action.
210    let mut child = {
211        let mut store = HANDLE_STORE
212            .lock()
213            .expect("long-running handle store poisoned");
214        match store.entries.get_mut(&handle_id) {
215            Some(entry) => match entry.child.take() {
216                Some(c) => c,
217                None => return, // already cancelled before we ran
218            },
219            None => return, // entry removed (cancelled before store insert — shouldn't happen)
220        }
221    };
222
223    // Drain stdout/stderr on separate threads to prevent pipe deadlock.
224    use std::io::Read;
225    let mut stdout_bytes = Vec::new();
226    let mut stderr_bytes = Vec::new();
227    let (out_tx, out_rx) = std::sync::mpsc::channel::<Vec<u8>>();
228    let (err_tx, err_rx) = std::sync::mpsc::channel::<Vec<u8>>();
229
230    if let Some(mut out) = child.stdout.take() {
231        std::thread::spawn(move || {
232            let _ = out.read_to_end(&mut stdout_bytes);
233            let _ = out_tx.send(stdout_bytes);
234        });
235    }
236    if let Some(mut err) = child.stderr.take() {
237        std::thread::spawn(move || {
238            let _ = err.read_to_end(&mut stderr_bytes);
239            let _ = err_tx.send(stderr_bytes);
240        });
241    }
242
243    let status = child.wait().ok();
244
245    let stdout = out_rx
246        .recv_timeout(Duration::from_secs(5))
247        .unwrap_or_default();
248    let stderr = err_rx
249        .recv_timeout(Duration::from_secs(5))
250        .unwrap_or_default();
251
252    // Remove our entry from the store.
253    {
254        let mut store = HANDLE_STORE
255            .lock()
256            .expect("long-running handle store poisoned");
257        store.entries.remove(&handle_id);
258    }
259
260    // If cancellation was requested, don't push feedback — the caller
261    // that cancelled doesn't want to receive a spurious tool_result.
262    if cancel_state.cancelled.load(Ordering::Acquire) {
263        return;
264    }
265
266    let (exit_code, signal_name) = match status {
267        Some(s) => decode_exit_status(s),
268        // wait() itself failed — treat as killed (extremely unusual).
269        None => (-1, Some("SIGKILL".to_string())),
270    };
271    let duration_ms = waiter_start.elapsed().as_millis() as i64;
272
273    let mut payload = serde_json::Map::new();
274    payload.insert("handle_id".into(), serde_json::Value::String(handle_id));
275    payload.insert(
276        "exit_code".into(),
277        serde_json::Value::Number(exit_code.into()),
278    );
279    payload.insert(
280        "stdout".into(),
281        serde_json::Value::String(String::from_utf8_lossy(&stdout).into_owned()),
282    );
283    payload.insert(
284        "stderr".into(),
285        serde_json::Value::String(String::from_utf8_lossy(&stderr).into_owned()),
286    );
287    payload.insert(
288        "duration_ms".into(),
289        serde_json::Value::Number(duration_ms.into()),
290    );
291    if let Some(sig) = signal_name {
292        payload.insert("signal".into(), serde_json::Value::String(sig));
293    } else {
294        payload.insert("signal".into(), serde_json::Value::Null);
295    }
296
297    let content = serde_json::to_string(&payload).unwrap_or_default();
298    harn_vm::push_pending_feedback_global(&session_id, "tool_result", &content);
299}
300
301/// Cancel a specific in-flight long-running handle. Kills the process and
302/// removes the entry. Returns `true` if the handle was found and cancelled.
303pub fn cancel_handle(handle_id: &str) -> bool {
304    let (pid, child, cancel_state) = {
305        let mut store = HANDLE_STORE
306            .lock()
307            .expect("long-running handle store poisoned");
308        match store.entries.remove(handle_id) {
309            None => return false,
310            Some(mut entry) => (entry.pid, entry.child.take(), entry.cancel_state.clone()),
311        }
312    };
313    do_kill(pid, child, cancel_state);
314    true
315}
316
317/// Cancel all in-flight handles for a given session. Called by the
318/// session-end hook to avoid orphaned processes.
319pub fn cancel_session_handles(session_id: &str) {
320    let to_kill: Vec<(u32, Option<Child>, Arc<CancelState>)> = {
321        let mut store = HANDLE_STORE
322            .lock()
323            .expect("long-running handle store poisoned");
324        let matching: Vec<String> = store
325            .entries
326            .iter()
327            .filter(|(_, e)| e.session_id == session_id)
328            .map(|(id, _)| id.clone())
329            .collect();
330        matching
331            .into_iter()
332            .filter_map(|id| {
333                store.entries.remove(&id).map(|mut e| {
334                    let child = e.child.take();
335                    (e.pid, child, e.cancel_state.clone())
336                })
337            })
338            .collect()
339    };
340    for (pid, child, cancel_state) in to_kill {
341        do_kill(pid, child, cancel_state);
342    }
343}
344
345/// Set the cancellation flag and kill the process. Used by both `cancel_handle`
346/// and `cancel_session_handles`.
347fn do_kill(pid: u32, child: Option<Child>, cancel_state: Arc<CancelState>) {
348    // Signal cancellation so the waiter (if still running) skips feedback.
349    cancel_state.cancelled.store(true, Ordering::Release);
350    if let Some(mut c) = child {
351        // Waiter hasn't taken the child yet — kill it directly.
352        kill_child(&mut c);
353    } else {
354        // Waiter already took the child; signal by PID.
355        kill_pid(pid);
356    }
357}
358
359/// Register the session-cleanup hook with harn-vm. Uses a `OnceLock` so the
360/// hook is registered exactly once even if `register_builtins` is called
361/// multiple times (e.g. in tests).
362pub(crate) fn register_cleanup_hook() {
363    static REGISTERED: OnceLock<()> = OnceLock::new();
364    REGISTERED.get_or_init(|| {
365        let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
366            cancel_session_handles(session_id);
367        });
368        harn_vm::register_session_end_hook(hook);
369    });
370}
371
372fn kill_child(child: &mut Child) {
373    let _ = child.kill();
374    let _ = child.wait();
375}
376
377/// Kill a process by its PID. Used when the waiter thread has already taken
378/// ownership of the `Child` object but the process must still be terminated.
379fn kill_pid(pid: u32) {
380    #[cfg(unix)]
381    {
382        // SAFETY: We call kill(2) with a valid PID and SIGKILL (9). On all
383        // Unix targets pid_t and int are i32. No libc crate needed.
384        extern "C" {
385            fn kill(pid: i32, sig: i32) -> i32;
386        }
387        unsafe {
388            kill(pid as i32, 9); // SIGKILL
389        }
390    }
391    #[cfg(not(unix))]
392    {
393        let _ = pid; // No-op on non-Unix; TerminateProcess would require winapi.
394    }
395}
396
397fn decode_exit_status(status: std::process::ExitStatus) -> (i32, Option<String>) {
398    #[cfg(unix)]
399    {
400        use std::os::unix::process::ExitStatusExt;
401        if let Some(code) = status.code() {
402            return (code, None);
403        }
404        if let Some(sig) = status.signal() {
405            return (-1, Some(format!("SIG{sig}")));
406        }
407        (-1, None)
408    }
409    #[cfg(not(unix))]
410    (status.code().unwrap_or(-1), None)
411}