Skip to main content

running_process/
lib.rs

1//! Cross-platform process execution, process-tree control, PTY handling, and
2//! broker integration primitives.
3//!
4//! The crate exposes a synchronous process API through [`NativeProcess`], a
5//! contained process-group helper through [`ContainedProcessGroup`], low-level
6//! spawn helpers through [`spawn()`] and [`spawn_daemon`], and optional
7//! daemon/broker modules behind feature flags.
8
9use std::collections::VecDeque;
10use std::io::Read;
11use std::process::{Child, Command, Stdio};
12use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, Instant};
16
17use crate::observer::ObserverEmitter;
18
19pub mod console_detect;
20pub mod containment;
21mod helpers;
22// Phase 1 of #221: process-observation capability model + portable
23// lifecycle baseline. Core-feature-clean (std-only: mpsc + SystemTime),
24// so the started/exited baseline is available to the base library
25// without pulling in the daemon runtime.
26pub mod observer;
27#[cfg(feature = "originator-scan")]
28pub mod originator;
29// Wave 3+4 of #165: proto module + IPC client absorbed from the
30// former `running-process-proto` and `running-process-client` crates.
31// Both gated behind `feature = "client"`. The protobuf package
32// `running_process.daemon.v1` compiles to the file referenced below.
33#[cfg(feature = "client")]
34/// Prost-generated daemon protocol types used by the client transport.
35pub mod proto {
36    /// Generated Rust bindings for the `running_process.daemon.v1` protobuf package.
37    #[allow(missing_docs)]
38    pub mod daemon {
39        include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
40    }
41}
42
43#[cfg(feature = "client")]
44pub mod client;
45
46// Phase 0 of #228: v1 broker module — prost-generated wire types from
47// `proto/broker_v1_*.proto`. Gated behind `feature = "client"` because
48// prost itself is optional under that feature. Schemas are
49// FROZEN FOREVER once v1.0 ships.
50#[cfg(feature = "client")]
51pub mod broker;
52
53// Phase 1 of #228 (issue #230): maintenance subcommands exposed via
54// the `runpm` CLI. Currently just `release-handles` — a cross-platform
55// scaffold for the Windows worktree-teardown handle-race fix
56// (soldr#710). Gated behind `feature = "client"` because the CLI that
57// drives it is.
58#[cfg(feature = "client")]
59pub mod maintenance;
60
61#[cfg(feature = "client")]
62pub mod cleanup;
63
64// Phase 4 of #222 (#427): per-OS boot autostart for the runpm daemon.
65// Gated behind `feature = "client"` because the only consumer is the
66// `runpm` CLI binary, which is itself client-gated.
67#[cfg(feature = "client")]
68pub mod boot_autostart;
69
70// Phase 5 of #222 (#428): `runpm.toml` parser used by the `runpm` CLI
71// to batch-start `[[app]]` entries. Lives in the library (not under
72// `src/bin/`) so the integration test in `tests/runpm_toml_config.rs`
73// can drive the same code path the binary uses.
74#[cfg(feature = "client")]
75pub mod runpm_config;
76
77// #415: consumer-consumable conformance test kit. Gated behind the
78// off-by-default `test-support` cargo feature (which implies `client`)
79// so the helpers ship in the published crate but only compile when a
80// consumer opts in as a dev-dependency.
81#[cfg(feature = "test-support")]
82pub mod test_support;
83
84// Lightweight tee sink primitives for callers that want transcript/log
85// fan-out without pulling in the full daemon runtime.
86#[cfg(feature = "telemetry")]
87#[path = "daemon/telemetry.rs"]
88pub mod telemetry;
89
90// Wave 5 of #165: daemon runtime absorbed from `running-process-daemon`.
91// Heavy deps (tokio, sqlite, etc.) gated behind `feature = "daemon"`.
92#[cfg(feature = "daemon")]
93/// Daemon runtime APIs and helpers enabled by the `daemon` feature.
94pub mod daemon;
95/// PTY-backed process APIs.
96pub mod pty;
97mod public_symbols;
98mod rust_debug;
99pub mod spawn;
100pub mod systemd_killmode;
101pub mod terminal_graphics;
102mod types;
103#[cfg(unix)]
104mod unix;
105#[cfg(windows)]
106mod windows;
107
108pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
109pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
110pub use observer::{
111    CapabilitySupport, CategoryCapability, EventCategory, ObserverCapabilities, ObserverConfig,
112    ObserverEvent, ObserverEventKind, ObserverSubscriber,
113};
114#[cfg(feature = "originator-scan")]
115pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
116pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
117pub use spawn::{
118    spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
119    StdioSource,
120};
121pub use terminal_graphics::{
122    current_terminal_capabilities, current_terminal_capabilities_with_timeout,
123    detect_terminal_capabilities, CapabilityStatus, EvidenceStrength, GraphicsCapability,
124    GraphicsProtocol, TerminalCapabilities, TerminalCapabilityInput, TerminalGraphicsCapabilities,
125    TerminalProbeEvidence,
126};
127pub use types::{
128    CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
129    StreamEvent, StreamKind,
130};
131
132pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
133#[cfg(unix)]
134pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
135#[cfg(windows)]
136pub(crate) use windows::{
137    assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
138    WindowsJobHandle,
139};
140
141#[macro_export]
142/// Create a scoped Rust debug trace label for the current function body.
143macro_rules! rp_rust_debug_scope {
144    ($label:expr) => {
145        let _running_process_rust_debug_scope =
146            $crate::RustDebugScopeGuard::enter($label, file!(), line!());
147    };
148}
149
150#[derive(Default)]
151struct QueueState {
152    stdout_queue: VecDeque<Vec<u8>>,
153    stderr_queue: VecDeque<Vec<u8>>,
154    combined_queue: VecDeque<StreamEvent>,
155    stdout_history: VecDeque<Vec<u8>>,
156    stderr_history: VecDeque<Vec<u8>>,
157    combined_history: VecDeque<StreamEvent>,
158    stdout_raw: Vec<u8>,
159    stderr_raw: Vec<u8>,
160    stdout_history_bytes: usize,
161    stderr_history_bytes: usize,
162    combined_history_bytes: usize,
163    stdout_closed: bool,
164    stderr_closed: bool,
165}
166
167/// Sentinel value for returncode atomic: process has not exited yet.
168const RETURNCODE_NOT_SET: i64 = i64::MIN;
169
170struct SharedState {
171    queues: Mutex<QueueState>,
172    condvar: Condvar,
173    /// Atomic exit code. `RETURNCODE_NOT_SET` means "not exited yet".
174    /// Updated by a background waiter thread — reading is lock-free.
175    returncode: AtomicI64,
176    /// Phase 1 of #221: optional lifecycle-event emitter. `None` means
177    /// observation is off (the off-by-default path), so the lifecycle
178    /// hooks are inert. When `Some`, `started` is emitted once at spawn
179    /// and `exited` exactly once on the first returncode transition.
180    observer: Option<ObserverEmitter>,
181    /// Guards against emitting more than one `exited` event when several
182    /// code paths (waiter thread, `poll`, `kill`) race to record the exit.
183    observer_exit_emitted: AtomicBool,
184}
185
186struct ChildState {
187    child: Child,
188    #[cfg(windows)]
189    _job: WindowsJobHandle,
190}
191
192impl SharedState {
193    fn new(capture: bool) -> Self {
194        Self::with_observer(capture, None)
195    }
196
197    fn with_observer(capture: bool, observer: Option<ObserverEmitter>) -> Self {
198        let queues = QueueState {
199            stdout_closed: !capture,
200            stderr_closed: !capture,
201            ..QueueState::default()
202        };
203        Self {
204            queues: Mutex::new(queues),
205            condvar: Condvar::new(),
206            returncode: AtomicI64::new(RETURNCODE_NOT_SET),
207            observer,
208            observer_exit_emitted: AtomicBool::new(false),
209        }
210    }
211
212    /// Emit the lifecycle `exited` event exactly once, regardless of which
213    /// code path first observes the exit. No-op when observation is off.
214    fn emit_exited(&self, pid: u32, exit_code: i32) {
215        let Some(emitter) = self.observer.as_ref() else {
216            return;
217        };
218        if self
219            .observer_exit_emitted
220            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
221            .is_ok()
222        {
223            emitter.emit_exited(pid, exit_code);
224        }
225    }
226}
227
228/// A cross-platform child process with optional output capture.
229///
230/// `NativeProcess` wraps [`std::process::Command`] with the crate's
231/// process-tree containment, capture draining, timeout, and terminal-control
232/// behavior. Methods are synchronous and are safe to call from ordinary
233/// blocking code.
234pub struct NativeProcess {
235    config: ProcessConfig,
236    child: Arc<Mutex<Option<ChildState>>>,
237    shared: Arc<SharedState>,
238    #[cfg(windows)]
239    capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
240}
241
242impl NativeProcess {
243    /// Create a process wrapper from a [`ProcessConfig`].
244    ///
245    /// The child is not spawned until [`Self::start`] is called. Process
246    /// observation is **off by default**: no lifecycle events are emitted
247    /// unless [`Self::with_observer`] is used instead.
248    pub fn new(config: ProcessConfig) -> Self {
249        Self::new_with_observer(config, None)
250    }
251
252    /// Create a process wrapper with process observation enabled (Phase 1
253    /// of #221).
254    ///
255    /// Returns the wrapper paired with an [`ObserverSubscriber`] that
256    /// receives a [`started`](crate::ObserverEventKind::Started) event when
257    /// [`Self::start`] spawns the child and exactly one
258    /// [`exited`](crate::ObserverEventKind::Exited) event when the child is
259    /// reaped — for the categories the `config` requests that are actually
260    /// `Supported` (only [`Lifecycle`](crate::EventCategory::Lifecycle) in
261    /// Phase 1; see [`ObserverCapabilities::negotiate`](crate::ObserverCapabilities::negotiate)).
262    ///
263    /// The emitter never blocks on a slow or dropped subscriber.
264    pub fn with_observer(
265        config: ProcessConfig,
266        observer: crate::observer::ObserverConfig,
267    ) -> (Self, ObserverSubscriber) {
268        let (emitter, subscriber) = ObserverEmitter::new(observer);
269        let process = Self::new_with_observer(config, Some(emitter));
270        (process, subscriber)
271    }
272
273    fn new_with_observer(config: ProcessConfig, observer: Option<ObserverEmitter>) -> Self {
274        let shared = match observer {
275            // Off-by-default path: no emitter, no observation state.
276            None => SharedState::new(config.capture),
277            Some(emitter) => SharedState::with_observer(config.capture, Some(emitter)),
278        };
279        Self {
280            shared: Arc::new(shared),
281            child: Arc::new(Mutex::new(None)),
282            config,
283            #[cfg(windows)]
284            capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
285        }
286    }
287
288    // Preserve a stable Rust frame here in release user dumps.
289    #[inline(never)]
290    /// Spawn the configured child process.
291    ///
292    /// Returns [`ProcessError::AlreadyStarted`] if the same wrapper already
293    /// owns a running child.
294    pub fn start(&self) -> Result<(), ProcessError> {
295        public_symbols::rp_native_process_start_public(self)
296    }
297
298    fn start_impl(&self) -> Result<(), ProcessError> {
299        crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
300        let mut guard = self.child.lock().expect("child mutex poisoned");
301        if guard.is_some() {
302            return Err(ProcessError::AlreadyStarted);
303        }
304
305        let mut command = self.build_command();
306        match self.config.stdin_mode {
307            StdinMode::Inherit => {}
308            StdinMode::Piped => {
309                command.stdin(Stdio::piped());
310            }
311            StdinMode::Null => {
312                command.stdin(Stdio::null());
313            }
314        }
315        if self.config.capture {
316            command.stdout(Stdio::piped());
317            command.stderr(Stdio::piped());
318        }
319
320        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
321        log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
322        // Phase 1 of #221: emit the lifecycle `started` event. No-op when
323        // observation is off (the common, off-by-default path).
324        if let Some(emitter) = self.shared.observer.as_ref() {
325            emitter.emit_started(child.id());
326        }
327        #[cfg(windows)]
328        let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
329            .map_err(ProcessError::Spawn)?;
330        if self.config.capture {
331            let stdout = child.stdout.take().expect("stdout pipe missing");
332            let stderr = child.stderr.take().expect("stderr pipe missing");
333            #[cfg(windows)]
334            {
335                use std::os::windows::io::AsRawHandle;
336                let mut handles = self
337                    .capture_pipe_handles
338                    .lock()
339                    .expect("capture pipe handles mutex poisoned");
340                handles.stdout = Some(stdout.as_raw_handle() as usize);
341                handles.stderr = Some(stderr.as_raw_handle() as usize);
342            }
343            self.spawn_reader(
344                stdout,
345                StreamKind::Stdout,
346                StreamKind::Stdout,
347                self.pipe_done_callback(StreamKind::Stdout),
348            );
349            self.spawn_reader(
350                stderr,
351                StreamKind::Stderr,
352                match self.config.stderr_mode {
353                    StderrMode::Stdout => StreamKind::Stdout,
354                    StderrMode::Pipe => StreamKind::Stderr,
355                },
356                self.pipe_done_callback(StreamKind::Stderr),
357            );
358        }
359        *guard = Some(ChildState {
360            child,
361            #[cfg(windows)]
362            _job: job,
363        });
364        drop(guard);
365        self.spawn_exit_waiter();
366        Ok(())
367    }
368
369    /// Background thread that polls for process exit and stores the exit code
370    /// atomically. This makes `returncode` auto-update without explicit `poll()`.
371    fn spawn_exit_waiter(&self) {
372        let child = Arc::clone(&self.child);
373        let shared = Arc::clone(&self.shared);
374        thread::spawn(move || {
375            loop {
376                if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
377                    return;
378                }
379                {
380                    let mut guard = child.lock().expect("child mutex poisoned");
381                    if let Some(child_state) = guard.as_mut() {
382                        let pid = child_state.child.id();
383                        match child_state.child.try_wait() {
384                            Ok(Some(status)) => {
385                                let code = exit_code(status);
386                                shared.returncode.store(code as i64, Ordering::Release);
387                                // Phase 1 of #221: lifecycle `exited`. Emit
388                                // before notifying waiters and is guarded so
389                                // only the first exit-observer fires.
390                                shared.emit_exited(pid, code);
391                                shared.condvar.notify_all();
392                                return;
393                            }
394                            Ok(None) => {}
395                            Err(_) => return,
396                        }
397                    } else {
398                        return;
399                    }
400                }
401                // #199: intentional — capture thread polling for
402                // child-exit. `try_wait` is non-blocking by design;
403                // we can't block here because the thread also drains
404                // pipe state alongside the exit check. 10ms keeps the
405                // CPU cost negligible while staying responsive.
406                thread::sleep(Duration::from_millis(10));
407            }
408        });
409    }
410
411    /// Write bytes to the child's stdin and then close stdin.
412    pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
413        let mut guard = self.child.lock().expect("child mutex poisoned");
414        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
415        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
416        use std::io::Write;
417        stdin.write_all(data).map_err(ProcessError::Io)?;
418        stdin.flush().map_err(ProcessError::Io)?;
419        drop(child.stdin.take());
420        Ok(())
421    }
422
423    /// Write to the child's stdin without closing it afterwards, so the
424    /// caller can issue additional writes. Used by interactive
425    /// pipe-backed sessions (#130 milestone 3) where the daemon keeps
426    /// stdin open across multiple client input frames.
427    pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
428        let mut guard = self.child.lock().expect("child mutex poisoned");
429        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
430        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
431        use std::io::Write;
432        stdin.write_all(data).map_err(ProcessError::Io)?;
433        stdin.flush().map_err(ProcessError::Io)?;
434        Ok(())
435    }
436
437    /// Explicitly close the child's stdin (signals EOF to the child).
438    /// Idempotent: returns Ok if stdin was already closed.
439    pub fn close_stdin(&self) -> Result<(), ProcessError> {
440        let mut guard = self.child.lock().expect("child mutex poisoned");
441        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
442        drop(child.stdin.take());
443        Ok(())
444    }
445
446    /// Check whether the child has exited without blocking.
447    ///
448    /// Returns `Ok(None)` while the process is still running.
449    pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
450        // Fast path: check atomic set by background waiter thread.
451        if let Some(code) = self.returncode() {
452            return Ok(Some(code));
453        }
454        let mut guard = self.child.lock().expect("child mutex poisoned");
455        let Some(child_state) = guard.as_mut() else {
456            return Ok(self.returncode());
457        };
458        let pid = child_state.child.id();
459        let child = &mut child_state.child;
460        let status = child.try_wait().map_err(ProcessError::Io)?;
461        if let Some(status) = status {
462            let code = exit_code(status);
463            self.set_returncode(code);
464            self.shared.emit_exited(pid, code);
465            return Ok(Some(code));
466        }
467        Ok(None)
468    }
469
470    // Preserve a stable Rust frame here in release user dumps.
471    #[inline(never)]
472    /// Wait for the child to exit.
473    ///
474    /// When `timeout` is `Some`, returns [`ProcessError::Timeout`] if the
475    /// child does not exit before the duration elapses.
476    pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
477        public_symbols::rp_native_process_wait_public(self, timeout)
478    }
479
480    fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
481        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
482        if self.child.lock().expect("child mutex poisoned").is_none() {
483            return self.returncode().ok_or(ProcessError::NotRunning);
484        }
485        // Fast path: already exited.
486        if let Some(code) = self.returncode() {
487            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
488            return Ok(code);
489        }
490        let start = Instant::now();
491        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
492        loop {
493            // Check returncode (set by exit-waiter thread via atomic + condvar).
494            let rc = self.shared.returncode.load(Ordering::Acquire);
495            if rc != RETURNCODE_NOT_SET {
496                drop(guard);
497                let code = rc as i32;
498                public_symbols::rp_native_process_wait_for_capture_completion_public(self);
499                return Ok(code);
500            }
501            if let Some(limit) = timeout {
502                let elapsed = start.elapsed();
503                if elapsed >= limit {
504                    return Err(ProcessError::Timeout);
505                }
506                let remaining = limit - elapsed;
507                // Wait on condvar with timeout, capped at 50ms to recheck.
508                let wait_time = remaining.min(Duration::from_millis(50));
509                guard = self
510                    .shared
511                    .condvar
512                    .wait_timeout(guard, wait_time)
513                    .expect("queue mutex poisoned")
514                    .0;
515            } else {
516                // Wait on condvar with periodic recheck.
517                guard = self
518                    .shared
519                    .condvar
520                    .wait_timeout(guard, Duration::from_millis(50))
521                    .expect("queue mutex poisoned")
522                    .0;
523            }
524        }
525    }
526
527    // Preserve a stable Rust frame here in release user dumps.
528    #[inline(never)]
529    /// Forcefully terminate the child process.
530    pub fn kill(&self) -> Result<(), ProcessError> {
531        public_symbols::rp_native_process_kill_public(self)
532    }
533
534    fn kill_impl(&self) -> Result<(), ProcessError> {
535        crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
536        {
537            let mut guard = self.child.lock().expect("child mutex poisoned");
538            let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
539            let pid = child.id();
540            child.kill().map_err(ProcessError::Io)?;
541            let status = child.wait().map_err(ProcessError::Io)?;
542            let code = exit_code(status);
543            self.set_returncode(code);
544            // Phase 1 of #221: a killed child still produces a lifecycle
545            // `exited` event (guarded against double-emit by the waiter).
546            self.shared.emit_exited(pid, code);
547        }
548        // On Windows, interrupt any pending blocking `read()` in the
549        // per-stream reader threads so they fall out of their loops
550        // immediately. This is what makes the grandchild-pipe-orphan
551        // case (FastLED Bug B: uv.exe spawns a python.exe grandchild
552        // that inherits the pipe and outlives uv) wake up in
553        // microseconds instead of waiting for the bounded-drain
554        // safety-net deadline below.
555        #[cfg(windows)]
556        self.cancel_capture_io();
557        // Synchronize with the per-stream reader threads so that by the
558        // time kill() returns, the capture queues have flipped from
559        // "blocked on read" to "closed" and downstream pollers (e.g.
560        // take_combined_line) observe EOS instead of timeout. Without
561        // this, callers that hit a wait()-timeout path see Python code
562        // raise TimeoutError, kill the child, then race the reader
563        // threads — a 10ms poll loop can miss the EOS flip entirely.
564        //
565        // The deadline is the safety-net: on Windows `CancelIoEx`
566        // above almost always wakes the readers first; on POSIX, and
567        // in any pathological Windows case where `CancelIoEx` doesn't
568        // fire, the deadline guarantees `kill()` still returns.
569        public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
570            self,
571            kill_drain_deadline(),
572        );
573        Ok(())
574    }
575
576    /// Terminate the child process.
577    ///
578    /// This currently uses the same hard-kill path as [`Self::kill`].
579    pub fn terminate(&self) -> Result<(), ProcessError> {
580        self.kill()
581    }
582
583    /// Send the OS-appropriate soft termination signal to the child's
584    /// process group (POSIX: SIGTERM to `-pid`; Windows: no soft path
585    /// implemented yet — returns Ok without doing anything so callers
586    /// can run the same code on both platforms and rely on the post-
587    /// grace hard kill).
588    ///
589    /// Requires `ProcessConfig.create_process_group=true` on POSIX so
590    /// that `-pid` resolves to the child's own group. With the default
591    /// `create_process_group=false`, the kill would walk back to the
592    /// caller's group; the method silently no-ops in that case to avoid
593    /// signaling the wrong tree.
594    ///
595    /// Used by the daemon-side pipe sessions (#130 M4 follow-up) so
596    /// that `TerminationOutcome::SoftExit` becomes meaningful on POSIX.
597    pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
598        #[cfg(unix)]
599        {
600            if !self.config.create_process_group {
601                return Ok(());
602            }
603            let pid = match self.pid() {
604                Some(p) => p as i32,
605                None => return Err(ProcessError::NotRunning),
606            };
607            let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
608            if result != 0 {
609                let err = std::io::Error::last_os_error();
610                if err.raw_os_error() != Some(libc::ESRCH) {
611                    return Err(ProcessError::Io(err));
612                }
613            }
614            Ok(())
615        }
616        #[cfg(windows)]
617        {
618            if !self.config.create_process_group {
619                // GenerateConsoleCtrlEvent only routes to children
620                // spawned with CREATE_NEW_PROCESS_GROUP, and the
621                // event would otherwise hit the daemon's own group.
622                // No-op so the hard-kill schedule still wins.
623                return Ok(());
624            }
625            let pid = match self.pid() {
626                Some(p) => p,
627                None => return Err(ProcessError::NotRunning),
628            };
629            // GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT=1, pid).
630            // SAFETY: the FFI call is the standard Windows API; no
631            // borrowed Rust state is involved.
632            let ok = unsafe {
633                winapi::um::wincon::GenerateConsoleCtrlEvent(
634                    winapi::um::wincon::CTRL_BREAK_EVENT,
635                    pid,
636                )
637            };
638            if ok == 0 {
639                let err = std::io::Error::last_os_error();
640                // ERROR_INVALID_HANDLE means the child has already
641                // exited or has detached from the console — treat as
642                // success because the soft step's only goal is to
643                // give the child a chance to exit cleanly, and a
644                // dead/detached child does not need one.
645                if err.raw_os_error() != Some(6) {
646                    return Err(ProcessError::Io(err));
647                }
648            }
649            Ok(())
650        }
651    }
652
653    // Preserve a stable Rust frame here in release user dumps.
654    #[inline(never)]
655    /// Close the process wrapper by terminating the child when it is running.
656    pub fn close(&self) -> Result<(), ProcessError> {
657        public_symbols::rp_native_process_close_public(self)
658    }
659
660    fn close_impl(&self) -> Result<(), ProcessError> {
661        crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
662        if self.child.lock().expect("child mutex poisoned").is_none() {
663            return Ok(());
664        }
665        if self.poll()?.is_none() {
666            self.kill()?;
667        } else {
668            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
669        }
670        Ok(())
671    }
672
673    /// Return the child process id when the wrapper currently owns a child.
674    pub fn pid(&self) -> Option<u32> {
675        self.child
676            .lock()
677            .expect("child mutex poisoned")
678            .as_ref()
679            .map(|state| state.child.id())
680    }
681
682    /// Return the cached exit code when the child has exited.
683    pub fn returncode(&self) -> Option<i32> {
684        let v = self.shared.returncode.load(Ordering::Acquire);
685        if v == RETURNCODE_NOT_SET {
686            None
687        } else {
688            Some(v as i32)
689        }
690    }
691
692    /// Return whether captured output is queued for one stream.
693    pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
694        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
695            return false;
696        }
697        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
698        match stream {
699            StreamKind::Stdout => !guard.stdout_queue.is_empty(),
700            StreamKind::Stderr => !guard.stderr_queue.is_empty(),
701        }
702    }
703
704    /// Return whether captured combined output is queued.
705    pub fn has_pending_combined(&self) -> bool {
706        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
707        !guard.combined_queue.is_empty()
708    }
709
710    /// Drain and return all queued output for one stream.
711    pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
712        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
713            return Vec::new();
714        }
715        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
716        let queue = match stream {
717            StreamKind::Stdout => &mut guard.stdout_queue,
718            StreamKind::Stderr => &mut guard.stderr_queue,
719        };
720        queue.drain(..).collect()
721    }
722
723    /// Drain and return all queued combined output events.
724    pub fn drain_combined(&self) -> Vec<StreamEvent> {
725        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
726        guard.combined_queue.drain(..).collect()
727    }
728
729    /// Read the next captured chunk from one stream.
730    ///
731    /// Returns [`ReadStatus::Timeout`] when `timeout` elapses before output or
732    /// EOF is observed.
733    pub fn read_stream(
734        &self,
735        stream: StreamKind,
736        timeout: Option<Duration>,
737    ) -> ReadStatus<Vec<u8>> {
738        let deadline = timeout.map(|limit| Instant::now() + limit);
739        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
740
741        loop {
742            if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
743                return ReadStatus::Eof;
744            }
745
746            let queue = match stream {
747                StreamKind::Stdout => &mut guard.stdout_queue,
748                StreamKind::Stderr => &mut guard.stderr_queue,
749            };
750            if let Some(line) = queue.pop_front() {
751                return ReadStatus::Line(line);
752            }
753
754            let closed = match stream {
755                StreamKind::Stdout => {
756                    if self.config.stderr_mode == StderrMode::Stdout {
757                        guard.stdout_closed && guard.stderr_closed
758                    } else {
759                        guard.stdout_closed
760                    }
761                }
762                StreamKind::Stderr => guard.stderr_closed,
763            };
764            if closed {
765                return ReadStatus::Eof;
766            }
767
768            match deadline {
769                Some(deadline) => {
770                    let now = Instant::now();
771                    if now >= deadline {
772                        return ReadStatus::Timeout;
773                    }
774                    let wait = deadline.saturating_duration_since(now);
775                    let result = self
776                        .shared
777                        .condvar
778                        .wait_timeout(guard, wait)
779                        .expect("queue mutex poisoned");
780                    guard = result.0;
781                    if result.1.timed_out() {
782                        return ReadStatus::Timeout;
783                    }
784                }
785                None => {
786                    guard = self
787                        .shared
788                        .condvar
789                        .wait(guard)
790                        .expect("queue mutex poisoned");
791                }
792            }
793        }
794    }
795
796    // Preserve a stable Rust frame here in release user dumps.
797    #[inline(never)]
798    /// Read the next captured combined stream event.
799    pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
800        public_symbols::rp_native_process_read_combined_public(self, timeout)
801    }
802
803    fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
804        crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
805        let deadline = timeout.map(|limit| Instant::now() + limit);
806        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
807
808        loop {
809            if let Some(event) = guard.combined_queue.pop_front() {
810                return ReadStatus::Line(event);
811            }
812            if guard.stdout_closed && guard.stderr_closed {
813                return ReadStatus::Eof;
814            }
815
816            match deadline {
817                Some(deadline) => {
818                    let now = Instant::now();
819                    if now >= deadline {
820                        return ReadStatus::Timeout;
821                    }
822                    let wait = deadline.saturating_duration_since(now);
823                    let result = self
824                        .shared
825                        .condvar
826                        .wait_timeout(guard, wait)
827                        .expect("queue mutex poisoned");
828                    guard = result.0;
829                    if result.1.timed_out() {
830                        return ReadStatus::Timeout;
831                    }
832                }
833                None => {
834                    guard = self
835                        .shared
836                        .condvar
837                        .wait(guard)
838                        .expect("queue mutex poisoned");
839                }
840            }
841        }
842    }
843
844    /// Return the retained stdout history.
845    pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
846        self.shared
847            .queues
848            .lock()
849            .expect("queue mutex poisoned")
850            .stdout_history
851            .clone()
852            .into_iter()
853            .collect()
854    }
855
856    fn captured_stdout_raw(&self) -> Vec<u8> {
857        self.shared
858            .queues
859            .lock()
860            .expect("queue mutex poisoned")
861            .stdout_raw
862            .clone()
863    }
864
865    /// Return the retained stderr history.
866    pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
867        if self.config.stderr_mode == StderrMode::Stdout {
868            return Vec::new();
869        }
870        self.shared
871            .queues
872            .lock()
873            .expect("queue mutex poisoned")
874            .stderr_history
875            .clone()
876            .into_iter()
877            .collect()
878    }
879
880    fn captured_stderr_raw(&self) -> Vec<u8> {
881        if self.config.stderr_mode == StderrMode::Stdout {
882            return Vec::new();
883        }
884        self.shared
885            .queues
886            .lock()
887            .expect("queue mutex poisoned")
888            .stderr_raw
889            .clone()
890    }
891
892    /// Return the retained combined stdout/stderr event history.
893    pub fn captured_combined(&self) -> Vec<StreamEvent> {
894        self.shared
895            .queues
896            .lock()
897            .expect("queue mutex poisoned")
898            .combined_history
899            .clone()
900            .into_iter()
901            .collect()
902    }
903
904    /// Return the retained byte count for one captured stream.
905    pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
906        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
907            return 0;
908        }
909        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
910        match stream {
911            StreamKind::Stdout => guard.stdout_history_bytes,
912            StreamKind::Stderr => guard.stderr_history_bytes,
913        }
914    }
915
916    /// Return the retained byte count for combined captured output.
917    pub fn captured_combined_bytes(&self) -> usize {
918        self.shared
919            .queues
920            .lock()
921            .expect("queue mutex poisoned")
922            .combined_history_bytes
923    }
924
925    /// Clear retained output history for one stream and return freed bytes.
926    pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
927        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
928            return 0;
929        }
930        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
931        match stream {
932            StreamKind::Stdout => {
933                let released = guard.stdout_history_bytes;
934                guard.stdout_history.clear();
935                guard.stdout_raw.clear();
936                guard.stdout_history_bytes = 0;
937                released
938            }
939            StreamKind::Stderr => {
940                let released = guard.stderr_history_bytes;
941                guard.stderr_history.clear();
942                guard.stderr_raw.clear();
943                guard.stderr_history_bytes = 0;
944                released
945            }
946        }
947    }
948
949    /// Clear retained combined output history and return freed bytes.
950    pub fn clear_captured_combined(&self) -> usize {
951        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
952        let released = guard.combined_history_bytes;
953        guard.combined_history.clear();
954        guard.combined_history_bytes = 0;
955        released
956    }
957
958    fn build_command(&self) -> Command {
959        let mut command = match &self.config.command {
960            CommandSpec::Shell(command) => shell_command(command),
961            CommandSpec::Argv(argv) => {
962                let mut command = Command::new(&argv[0]);
963                if argv.len() > 1 {
964                    command.args(&argv[1..]);
965                }
966                command
967            }
968        };
969        if let Some(cwd) = &self.config.cwd {
970            command.current_dir(cwd);
971        }
972        if let Some(env) = &self.config.env {
973            command.env_clear();
974            command.envs(env.iter().map(|(k, v)| (k, v)));
975        }
976        #[cfg(windows)]
977        {
978            use std::os::windows::process::CommandExt;
979
980            // CREATE_NEW_PROCESS_GROUP makes GenerateConsoleCtrlEvent
981            // with CTRL_BREAK_EVENT route to this child's group
982            // (rather than the daemon's group) — required for the
983            // pipe-session soft-signal path on Windows (#130 M4).
984            const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
985            let extra = if self.config.create_process_group {
986                CREATE_NEW_PROCESS_GROUP
987            } else {
988                0
989            };
990            let flags = self.config.creationflags.unwrap_or(0)
991                | extra
992                | windows_priority_flags(self.config.nice);
993            if flags != 0 {
994                command.creation_flags(flags);
995            }
996        }
997        #[cfg(unix)]
998        {
999            let create_process_group = self.config.create_process_group;
1000            let nice = self.config.nice;
1001
1002            if create_process_group || nice.is_some() {
1003                use std::os::unix::process::CommandExt;
1004
1005                unsafe {
1006                    command.pre_exec(move || {
1007                        if create_process_group && libc::setpgid(0, 0) == -1 {
1008                            return Err(std::io::Error::last_os_error());
1009                        }
1010                        if let Some(nice) = nice {
1011                            let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
1012                            if result == -1 {
1013                                return Err(std::io::Error::last_os_error());
1014                            }
1015                        }
1016                        Ok(())
1017                    });
1018                }
1019            }
1020        }
1021        command
1022    }
1023
1024    fn spawn_reader<R>(
1025        &self,
1026        pipe: R,
1027        source_stream: StreamKind,
1028        visible_stream: StreamKind,
1029        on_pipe_done: Box<dyn FnOnce() + Send>,
1030    ) where
1031        R: Read + Send + 'static,
1032    {
1033        let shared = Arc::clone(&self.shared);
1034        thread::spawn(move || {
1035            let mut reader = pipe;
1036            let mut chunk = vec![0_u8; 65536];
1037            let mut pending = Vec::new();
1038
1039            loop {
1040                match reader.read(&mut chunk) {
1041                    Ok(0) => break,
1042                    Ok(n) => {
1043                        append_raw(&shared, visible_stream, &chunk[..n]);
1044                        let lines = feed_chunk(&mut pending, &chunk[..n]);
1045                        emit_lines(&shared, visible_stream, lines);
1046                    }
1047                    Err(_) => break,
1048                }
1049            }
1050
1051            if !pending.is_empty() {
1052                emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
1053            }
1054
1055            // Clear the parent-side pipe-handle slot under its mutex
1056            // before dropping the reader. After this returns,
1057            // `kill_impl` can no longer try to `CancelIoEx` on us, so
1058            // it's safe for `reader`'s drop to close the HANDLE.
1059            on_pipe_done();
1060            drop(reader);
1061
1062            let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1063            match source_stream {
1064                StreamKind::Stdout => guard.stdout_closed = true,
1065                StreamKind::Stderr => guard.stderr_closed = true,
1066            }
1067            shared.condvar.notify_all();
1068        });
1069    }
1070
1071    #[cfg(windows)]
1072    fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
1073        let handles = Arc::clone(&self.capture_pipe_handles);
1074        Box::new(move || {
1075            let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
1076            match stream {
1077                StreamKind::Stdout => guard.stdout = None,
1078                StreamKind::Stderr => guard.stderr = None,
1079            }
1080        })
1081    }
1082
1083    #[cfg(not(windows))]
1084    fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
1085        Box::new(|| {})
1086    }
1087
1088    /// Cancel any pending blocking `read()` on the parent-side capture
1089    /// pipes so the reader threads' `read()` calls return
1090    /// `ERROR_OPERATION_ABORTED` immediately. Used by `kill_impl` to
1091    /// break the grandchild-orphan deadlock without waiting on
1092    /// `wait_for_capture_completion_with_deadline`'s safety-net.
1093    #[cfg(windows)]
1094    fn cancel_capture_io(&self) {
1095        crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
1096        use winapi::shared::ntdef::HANDLE;
1097        use winapi::um::ioapiset::CancelIoEx;
1098        let guard = self
1099            .capture_pipe_handles
1100            .lock()
1101            .expect("capture pipe handles mutex poisoned");
1102        if let Some(h) = guard.stdout {
1103            // SAFETY: the slot is `Some` only while the owning reader
1104            // thread still holds the `ChildStdout`, so the HANDLE is
1105            // valid for the duration of this call. The reader is
1106            // blocked in `lock()` on the same mutex if it's racing us
1107            // toward exit, so it cannot drop the pipe and close the
1108            // HANDLE until we return.
1109            unsafe {
1110                CancelIoEx(h as HANDLE, std::ptr::null_mut());
1111            }
1112        }
1113        if let Some(h) = guard.stderr {
1114            unsafe {
1115                CancelIoEx(h as HANDLE, std::ptr::null_mut());
1116            }
1117        }
1118    }
1119
1120    fn set_returncode(&self, code: i32) {
1121        self.shared.returncode.store(code as i64, Ordering::Release);
1122        self.shared.condvar.notify_all();
1123    }
1124
1125    fn wait_for_capture_completion_impl(&self) {
1126        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
1127        if !self.config.capture {
1128            return;
1129        }
1130
1131        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1132        while !(guard.stdout_closed && guard.stderr_closed) {
1133            guard = self
1134                .shared
1135                .condvar
1136                .wait(guard)
1137                .expect("queue mutex poisoned");
1138        }
1139    }
1140
1141    /// Like `wait_for_capture_completion_impl` but bounded by `deadline`.
1142    /// Returns `true` if the reader threads flipped both closed flags on
1143    /// their own, `false` if the deadline elapsed first. On timeout the
1144    /// closed flags are force-set (and waiters notified) so downstream
1145    /// pollers stop seeing `Timeout` and start seeing `Eof`. A reader
1146    /// thread that eventually unblocks after the OS releases the pipe
1147    /// will assign `closed = true` again, which is a harmless no-op.
1148    fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
1149        crate::rp_rust_debug_scope!(
1150            "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
1151        );
1152        if !self.config.capture {
1153            return true;
1154        }
1155
1156        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1157        while !(guard.stdout_closed && guard.stderr_closed) {
1158            let now = Instant::now();
1159            if now >= deadline {
1160                guard.stdout_closed = true;
1161                guard.stderr_closed = true;
1162                self.shared.condvar.notify_all();
1163                return false;
1164            }
1165            let (next_guard, result) = self
1166                .shared
1167                .condvar
1168                .wait_timeout(guard, deadline - now)
1169                .expect("queue mutex poisoned");
1170            guard = next_guard;
1171            if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
1172                guard.stdout_closed = true;
1173                guard.stderr_closed = true;
1174                self.shared.condvar.notify_all();
1175                return false;
1176            }
1177        }
1178        true
1179    }
1180}
1181
1182fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
1183    if lines.is_empty() {
1184        return;
1185    }
1186    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1187    for line in lines {
1188        let line_len = line.len();
1189        match stream {
1190            StreamKind::Stdout => {
1191                guard.stdout_history_bytes += line_len;
1192                guard.stdout_history.push_back(line.clone());
1193                guard.stdout_queue.push_back(line.clone());
1194            }
1195            StreamKind::Stderr => {
1196                guard.stderr_history_bytes += line_len;
1197                guard.stderr_history.push_back(line.clone());
1198                guard.stderr_queue.push_back(line.clone());
1199            }
1200        }
1201        let event = StreamEvent { stream, line };
1202        guard.combined_history_bytes += line_len;
1203        guard.combined_history.push_back(event.clone());
1204        guard.combined_queue.push_back(event);
1205    }
1206    shared.condvar.notify_all();
1207}
1208
1209fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1210    if chunk.is_empty() {
1211        return;
1212    }
1213    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1214    match stream {
1215        StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1216        StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1217    }
1218}
1219
1220/// Run a command to completion while concurrently draining stdout and stderr.
1221///
1222/// The helper forces capture on regardless of `config.capture`, returns raw
1223/// stdout/stderr bytes, and kills the child before returning
1224/// [`ProcessError::Timeout`] when `timeout` elapses.
1225pub fn run_command(
1226    mut config: ProcessConfig,
1227    timeout: Option<Duration>,
1228) -> Result<RunOutput, ProcessError> {
1229    config.capture = true;
1230    let process = NativeProcess::new(config);
1231    process.start()?;
1232
1233    let exit_code = match process.wait(timeout) {
1234        Ok(code) => code,
1235        Err(ProcessError::Timeout) => {
1236            match process.kill() {
1237                Ok(()) | Err(ProcessError::NotRunning) => {}
1238                Err(error) => return Err(error),
1239            }
1240            return Err(ProcessError::Timeout);
1241        }
1242        Err(error) => return Err(error),
1243    };
1244
1245    Ok(RunOutput {
1246        stdout: process.captured_stdout_raw(),
1247        stderr: process.captured_stderr_raw(),
1248        exit_code,
1249    })
1250}
1251
1252pub(crate) fn shell_command(command: &str) -> Command {
1253    #[cfg(windows)]
1254    {
1255        use std::os::windows::process::CommandExt;
1256
1257        let mut cmd = Command::new("cmd");
1258        cmd.raw_arg("/D /S /C \"");
1259        cmd.raw_arg(command);
1260        cmd.raw_arg("\"");
1261        cmd
1262    }
1263    #[cfg(not(windows))]
1264    {
1265        let mut cmd = Command::new("sh");
1266        cmd.arg("-lc").arg(command);
1267        cmd
1268    }
1269}
1270
1271#[cfg(test)]
1272mod tests;