Skip to main content

running_process/
lib.rs

1//! Cross-platform process execution, process-tree control, PTY handling, and
2//! broker integration primitives.
3//!
4//! The crate exposes a synchronous process API through [`NativeProcess`], a
5//! contained process-group helper through [`ContainedProcessGroup`], low-level
6//! spawn helpers through [`spawn()`] and [`spawn_daemon`], and optional
7//! daemon/broker modules behind feature flags.
8
9use std::collections::VecDeque;
10use std::io::Read;
11use std::process::{Child, Command, Stdio};
12use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, Instant};
16
17use crate::observer::ObserverEmitter;
18
19pub mod console_detect;
20pub mod containment;
21mod helpers;
22// Phase 1 of #221: process-observation capability model + portable
23// lifecycle baseline. Core-feature-clean (std-only: mpsc + SystemTime),
24// so the started/exited baseline is available to the base library
25// without pulling in the daemon runtime.
26pub mod observer;
27#[cfg(feature = "originator-scan")]
28pub mod originator;
29// Wave 3+4 of #165: proto module + IPC client absorbed from the
30// former `running-process-proto` and `running-process-client` crates.
31// Both gated behind `feature = "client"`. The protobuf package
32// `running_process.daemon.v1` compiles to the file referenced below.
33#[cfg(feature = "client")]
34/// Prost-generated daemon protocol types used by the client transport.
35pub mod proto {
36    /// Generated Rust bindings for the `running_process.daemon.v1` protobuf package.
37    #[allow(missing_docs)]
38    pub mod daemon {
39        include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
40    }
41}
42
43#[cfg(feature = "client")]
44pub mod client;
45
46// Phase 0 of #228: v1 broker module — prost-generated wire types from
47// `proto/broker_v1_*.proto`. Gated behind `feature = "client"` because
48// prost itself is optional under that feature. Schemas are
49// FROZEN FOREVER once v1.0 ships.
50#[cfg(feature = "client")]
51pub mod broker;
52
53// Phase 1 of #228 (issue #230): maintenance subcommands exposed via
54// the `runpm` CLI. Currently just `release-handles` — a cross-platform
55// scaffold for the Windows worktree-teardown handle-race fix
56// (soldr#710). Gated behind `feature = "client"` because the CLI that
57// drives it is.
58#[cfg(feature = "client")]
59pub mod maintenance;
60
61#[cfg(feature = "client")]
62pub mod cleanup;
63
64// #415: consumer-consumable conformance test kit. Gated behind the
65// off-by-default `test-support` cargo feature (which implies `client`)
66// so the helpers ship in the published crate but only compile when a
67// consumer opts in as a dev-dependency.
68#[cfg(feature = "test-support")]
69pub mod test_support;
70
71// Lightweight tee sink primitives for callers that want transcript/log
72// fan-out without pulling in the full daemon runtime.
73#[cfg(feature = "telemetry")]
74#[path = "daemon/telemetry.rs"]
75pub mod telemetry;
76
77// Wave 5 of #165: daemon runtime absorbed from `running-process-daemon`.
78// Heavy deps (tokio, sqlite, etc.) gated behind `feature = "daemon"`.
79#[cfg(feature = "daemon")]
80/// Daemon runtime APIs and helpers enabled by the `daemon` feature.
81pub mod daemon;
82/// PTY-backed process APIs.
83pub mod pty;
84mod public_symbols;
85mod rust_debug;
86pub mod spawn;
87pub mod systemd_killmode;
88pub mod terminal_graphics;
89mod types;
90#[cfg(unix)]
91mod unix;
92#[cfg(windows)]
93mod windows;
94
95pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
96pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
97pub use observer::{
98    CapabilitySupport, CategoryCapability, EventCategory, ObserverCapabilities, ObserverConfig,
99    ObserverEvent, ObserverEventKind, ObserverSubscriber,
100};
101#[cfg(feature = "originator-scan")]
102pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
103pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
104pub use spawn::{
105    spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
106    StdioSource,
107};
108pub use terminal_graphics::{
109    current_terminal_capabilities, current_terminal_capabilities_with_timeout,
110    detect_terminal_capabilities, CapabilityStatus, EvidenceStrength, GraphicsCapability,
111    GraphicsProtocol, TerminalCapabilities, TerminalCapabilityInput, TerminalGraphicsCapabilities,
112    TerminalProbeEvidence,
113};
114pub use types::{
115    CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
116    StreamEvent, StreamKind,
117};
118
119pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
120#[cfg(unix)]
121pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
122#[cfg(windows)]
123pub(crate) use windows::{
124    assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
125    WindowsJobHandle,
126};
127
128#[macro_export]
129/// Create a scoped Rust debug trace label for the current function body.
130macro_rules! rp_rust_debug_scope {
131    ($label:expr) => {
132        let _running_process_rust_debug_scope =
133            $crate::RustDebugScopeGuard::enter($label, file!(), line!());
134    };
135}
136
137#[derive(Default)]
138struct QueueState {
139    stdout_queue: VecDeque<Vec<u8>>,
140    stderr_queue: VecDeque<Vec<u8>>,
141    combined_queue: VecDeque<StreamEvent>,
142    stdout_history: VecDeque<Vec<u8>>,
143    stderr_history: VecDeque<Vec<u8>>,
144    combined_history: VecDeque<StreamEvent>,
145    stdout_raw: Vec<u8>,
146    stderr_raw: Vec<u8>,
147    stdout_history_bytes: usize,
148    stderr_history_bytes: usize,
149    combined_history_bytes: usize,
150    stdout_closed: bool,
151    stderr_closed: bool,
152}
153
154/// Sentinel value for returncode atomic: process has not exited yet.
155const RETURNCODE_NOT_SET: i64 = i64::MIN;
156
157struct SharedState {
158    queues: Mutex<QueueState>,
159    condvar: Condvar,
160    /// Atomic exit code. `RETURNCODE_NOT_SET` means "not exited yet".
161    /// Updated by a background waiter thread — reading is lock-free.
162    returncode: AtomicI64,
163    /// Phase 1 of #221: optional lifecycle-event emitter. `None` means
164    /// observation is off (the off-by-default path), so the lifecycle
165    /// hooks are inert. When `Some`, `started` is emitted once at spawn
166    /// and `exited` exactly once on the first returncode transition.
167    observer: Option<ObserverEmitter>,
168    /// Guards against emitting more than one `exited` event when several
169    /// code paths (waiter thread, `poll`, `kill`) race to record the exit.
170    observer_exit_emitted: AtomicBool,
171}
172
173struct ChildState {
174    child: Child,
175    #[cfg(windows)]
176    _job: WindowsJobHandle,
177}
178
179impl SharedState {
180    fn new(capture: bool) -> Self {
181        Self::with_observer(capture, None)
182    }
183
184    fn with_observer(capture: bool, observer: Option<ObserverEmitter>) -> Self {
185        let queues = QueueState {
186            stdout_closed: !capture,
187            stderr_closed: !capture,
188            ..QueueState::default()
189        };
190        Self {
191            queues: Mutex::new(queues),
192            condvar: Condvar::new(),
193            returncode: AtomicI64::new(RETURNCODE_NOT_SET),
194            observer,
195            observer_exit_emitted: AtomicBool::new(false),
196        }
197    }
198
199    /// Emit the lifecycle `exited` event exactly once, regardless of which
200    /// code path first observes the exit. No-op when observation is off.
201    fn emit_exited(&self, pid: u32, exit_code: i32) {
202        let Some(emitter) = self.observer.as_ref() else {
203            return;
204        };
205        if self
206            .observer_exit_emitted
207            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
208            .is_ok()
209        {
210            emitter.emit_exited(pid, exit_code);
211        }
212    }
213}
214
215/// A cross-platform child process with optional output capture.
216///
217/// `NativeProcess` wraps [`std::process::Command`] with the crate's
218/// process-tree containment, capture draining, timeout, and terminal-control
219/// behavior. Methods are synchronous and are safe to call from ordinary
220/// blocking code.
221pub struct NativeProcess {
222    config: ProcessConfig,
223    child: Arc<Mutex<Option<ChildState>>>,
224    shared: Arc<SharedState>,
225    #[cfg(windows)]
226    capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
227}
228
229impl NativeProcess {
230    /// Create a process wrapper from a [`ProcessConfig`].
231    ///
232    /// The child is not spawned until [`Self::start`] is called. Process
233    /// observation is **off by default**: no lifecycle events are emitted
234    /// unless [`Self::with_observer`] is used instead.
235    pub fn new(config: ProcessConfig) -> Self {
236        Self::new_with_observer(config, None)
237    }
238
239    /// Create a process wrapper with process observation enabled (Phase 1
240    /// of #221).
241    ///
242    /// Returns the wrapper paired with an [`ObserverSubscriber`] that
243    /// receives a [`started`](crate::ObserverEventKind::Started) event when
244    /// [`Self::start`] spawns the child and exactly one
245    /// [`exited`](crate::ObserverEventKind::Exited) event when the child is
246    /// reaped — for the categories the `config` requests that are actually
247    /// `Supported` (only [`Lifecycle`](crate::EventCategory::Lifecycle) in
248    /// Phase 1; see [`ObserverCapabilities::negotiate`](crate::ObserverCapabilities::negotiate)).
249    ///
250    /// The emitter never blocks on a slow or dropped subscriber.
251    pub fn with_observer(
252        config: ProcessConfig,
253        observer: crate::observer::ObserverConfig,
254    ) -> (Self, ObserverSubscriber) {
255        let (emitter, subscriber) = ObserverEmitter::new(observer);
256        let process = Self::new_with_observer(config, Some(emitter));
257        (process, subscriber)
258    }
259
260    fn new_with_observer(config: ProcessConfig, observer: Option<ObserverEmitter>) -> Self {
261        let shared = match observer {
262            // Off-by-default path: no emitter, no observation state.
263            None => SharedState::new(config.capture),
264            Some(emitter) => SharedState::with_observer(config.capture, Some(emitter)),
265        };
266        Self {
267            shared: Arc::new(shared),
268            child: Arc::new(Mutex::new(None)),
269            config,
270            #[cfg(windows)]
271            capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
272        }
273    }
274
275    // Preserve a stable Rust frame here in release user dumps.
276    #[inline(never)]
277    /// Spawn the configured child process.
278    ///
279    /// Returns [`ProcessError::AlreadyStarted`] if the same wrapper already
280    /// owns a running child.
281    pub fn start(&self) -> Result<(), ProcessError> {
282        public_symbols::rp_native_process_start_public(self)
283    }
284
285    fn start_impl(&self) -> Result<(), ProcessError> {
286        crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
287        let mut guard = self.child.lock().expect("child mutex poisoned");
288        if guard.is_some() {
289            return Err(ProcessError::AlreadyStarted);
290        }
291
292        let mut command = self.build_command();
293        match self.config.stdin_mode {
294            StdinMode::Inherit => {}
295            StdinMode::Piped => {
296                command.stdin(Stdio::piped());
297            }
298            StdinMode::Null => {
299                command.stdin(Stdio::null());
300            }
301        }
302        if self.config.capture {
303            command.stdout(Stdio::piped());
304            command.stderr(Stdio::piped());
305        }
306
307        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
308        log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
309        // Phase 1 of #221: emit the lifecycle `started` event. No-op when
310        // observation is off (the common, off-by-default path).
311        if let Some(emitter) = self.shared.observer.as_ref() {
312            emitter.emit_started(child.id());
313        }
314        #[cfg(windows)]
315        let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
316            .map_err(ProcessError::Spawn)?;
317        if self.config.capture {
318            let stdout = child.stdout.take().expect("stdout pipe missing");
319            let stderr = child.stderr.take().expect("stderr pipe missing");
320            #[cfg(windows)]
321            {
322                use std::os::windows::io::AsRawHandle;
323                let mut handles = self
324                    .capture_pipe_handles
325                    .lock()
326                    .expect("capture pipe handles mutex poisoned");
327                handles.stdout = Some(stdout.as_raw_handle() as usize);
328                handles.stderr = Some(stderr.as_raw_handle() as usize);
329            }
330            self.spawn_reader(
331                stdout,
332                StreamKind::Stdout,
333                StreamKind::Stdout,
334                self.pipe_done_callback(StreamKind::Stdout),
335            );
336            self.spawn_reader(
337                stderr,
338                StreamKind::Stderr,
339                match self.config.stderr_mode {
340                    StderrMode::Stdout => StreamKind::Stdout,
341                    StderrMode::Pipe => StreamKind::Stderr,
342                },
343                self.pipe_done_callback(StreamKind::Stderr),
344            );
345        }
346        *guard = Some(ChildState {
347            child,
348            #[cfg(windows)]
349            _job: job,
350        });
351        drop(guard);
352        self.spawn_exit_waiter();
353        Ok(())
354    }
355
356    /// Background thread that polls for process exit and stores the exit code
357    /// atomically. This makes `returncode` auto-update without explicit `poll()`.
358    fn spawn_exit_waiter(&self) {
359        let child = Arc::clone(&self.child);
360        let shared = Arc::clone(&self.shared);
361        thread::spawn(move || {
362            loop {
363                if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
364                    return;
365                }
366                {
367                    let mut guard = child.lock().expect("child mutex poisoned");
368                    if let Some(child_state) = guard.as_mut() {
369                        let pid = child_state.child.id();
370                        match child_state.child.try_wait() {
371                            Ok(Some(status)) => {
372                                let code = exit_code(status);
373                                shared.returncode.store(code as i64, Ordering::Release);
374                                // Phase 1 of #221: lifecycle `exited`. Emit
375                                // before notifying waiters and is guarded so
376                                // only the first exit-observer fires.
377                                shared.emit_exited(pid, code);
378                                shared.condvar.notify_all();
379                                return;
380                            }
381                            Ok(None) => {}
382                            Err(_) => return,
383                        }
384                    } else {
385                        return;
386                    }
387                }
388                // #199: intentional — capture thread polling for
389                // child-exit. `try_wait` is non-blocking by design;
390                // we can't block here because the thread also drains
391                // pipe state alongside the exit check. 10ms keeps the
392                // CPU cost negligible while staying responsive.
393                thread::sleep(Duration::from_millis(10));
394            }
395        });
396    }
397
398    /// Write bytes to the child's stdin and then close stdin.
399    pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
400        let mut guard = self.child.lock().expect("child mutex poisoned");
401        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
402        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
403        use std::io::Write;
404        stdin.write_all(data).map_err(ProcessError::Io)?;
405        stdin.flush().map_err(ProcessError::Io)?;
406        drop(child.stdin.take());
407        Ok(())
408    }
409
410    /// Write to the child's stdin without closing it afterwards, so the
411    /// caller can issue additional writes. Used by interactive
412    /// pipe-backed sessions (#130 milestone 3) where the daemon keeps
413    /// stdin open across multiple client input frames.
414    pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
415        let mut guard = self.child.lock().expect("child mutex poisoned");
416        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
417        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
418        use std::io::Write;
419        stdin.write_all(data).map_err(ProcessError::Io)?;
420        stdin.flush().map_err(ProcessError::Io)?;
421        Ok(())
422    }
423
424    /// Explicitly close the child's stdin (signals EOF to the child).
425    /// Idempotent: returns Ok if stdin was already closed.
426    pub fn close_stdin(&self) -> Result<(), ProcessError> {
427        let mut guard = self.child.lock().expect("child mutex poisoned");
428        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
429        drop(child.stdin.take());
430        Ok(())
431    }
432
433    /// Check whether the child has exited without blocking.
434    ///
435    /// Returns `Ok(None)` while the process is still running.
436    pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
437        // Fast path: check atomic set by background waiter thread.
438        if let Some(code) = self.returncode() {
439            return Ok(Some(code));
440        }
441        let mut guard = self.child.lock().expect("child mutex poisoned");
442        let Some(child_state) = guard.as_mut() else {
443            return Ok(self.returncode());
444        };
445        let pid = child_state.child.id();
446        let child = &mut child_state.child;
447        let status = child.try_wait().map_err(ProcessError::Io)?;
448        if let Some(status) = status {
449            let code = exit_code(status);
450            self.set_returncode(code);
451            self.shared.emit_exited(pid, code);
452            return Ok(Some(code));
453        }
454        Ok(None)
455    }
456
457    // Preserve a stable Rust frame here in release user dumps.
458    #[inline(never)]
459    /// Wait for the child to exit.
460    ///
461    /// When `timeout` is `Some`, returns [`ProcessError::Timeout`] if the
462    /// child does not exit before the duration elapses.
463    pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
464        public_symbols::rp_native_process_wait_public(self, timeout)
465    }
466
467    fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
468        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
469        if self.child.lock().expect("child mutex poisoned").is_none() {
470            return self.returncode().ok_or(ProcessError::NotRunning);
471        }
472        // Fast path: already exited.
473        if let Some(code) = self.returncode() {
474            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
475            return Ok(code);
476        }
477        let start = Instant::now();
478        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
479        loop {
480            // Check returncode (set by exit-waiter thread via atomic + condvar).
481            let rc = self.shared.returncode.load(Ordering::Acquire);
482            if rc != RETURNCODE_NOT_SET {
483                drop(guard);
484                let code = rc as i32;
485                public_symbols::rp_native_process_wait_for_capture_completion_public(self);
486                return Ok(code);
487            }
488            if let Some(limit) = timeout {
489                let elapsed = start.elapsed();
490                if elapsed >= limit {
491                    return Err(ProcessError::Timeout);
492                }
493                let remaining = limit - elapsed;
494                // Wait on condvar with timeout, capped at 50ms to recheck.
495                let wait_time = remaining.min(Duration::from_millis(50));
496                guard = self
497                    .shared
498                    .condvar
499                    .wait_timeout(guard, wait_time)
500                    .expect("queue mutex poisoned")
501                    .0;
502            } else {
503                // Wait on condvar with periodic recheck.
504                guard = self
505                    .shared
506                    .condvar
507                    .wait_timeout(guard, Duration::from_millis(50))
508                    .expect("queue mutex poisoned")
509                    .0;
510            }
511        }
512    }
513
514    // Preserve a stable Rust frame here in release user dumps.
515    #[inline(never)]
516    /// Forcefully terminate the child process.
517    pub fn kill(&self) -> Result<(), ProcessError> {
518        public_symbols::rp_native_process_kill_public(self)
519    }
520
521    fn kill_impl(&self) -> Result<(), ProcessError> {
522        crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
523        {
524            let mut guard = self.child.lock().expect("child mutex poisoned");
525            let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
526            let pid = child.id();
527            child.kill().map_err(ProcessError::Io)?;
528            let status = child.wait().map_err(ProcessError::Io)?;
529            let code = exit_code(status);
530            self.set_returncode(code);
531            // Phase 1 of #221: a killed child still produces a lifecycle
532            // `exited` event (guarded against double-emit by the waiter).
533            self.shared.emit_exited(pid, code);
534        }
535        // On Windows, interrupt any pending blocking `read()` in the
536        // per-stream reader threads so they fall out of their loops
537        // immediately. This is what makes the grandchild-pipe-orphan
538        // case (FastLED Bug B: uv.exe spawns a python.exe grandchild
539        // that inherits the pipe and outlives uv) wake up in
540        // microseconds instead of waiting for the bounded-drain
541        // safety-net deadline below.
542        #[cfg(windows)]
543        self.cancel_capture_io();
544        // Synchronize with the per-stream reader threads so that by the
545        // time kill() returns, the capture queues have flipped from
546        // "blocked on read" to "closed" and downstream pollers (e.g.
547        // take_combined_line) observe EOS instead of timeout. Without
548        // this, callers that hit a wait()-timeout path see Python code
549        // raise TimeoutError, kill the child, then race the reader
550        // threads — a 10ms poll loop can miss the EOS flip entirely.
551        //
552        // The deadline is the safety-net: on Windows `CancelIoEx`
553        // above almost always wakes the readers first; on POSIX, and
554        // in any pathological Windows case where `CancelIoEx` doesn't
555        // fire, the deadline guarantees `kill()` still returns.
556        public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
557            self,
558            kill_drain_deadline(),
559        );
560        Ok(())
561    }
562
563    /// Terminate the child process.
564    ///
565    /// This currently uses the same hard-kill path as [`Self::kill`].
566    pub fn terminate(&self) -> Result<(), ProcessError> {
567        self.kill()
568    }
569
570    /// Send the OS-appropriate soft termination signal to the child's
571    /// process group (POSIX: SIGTERM to `-pid`; Windows: no soft path
572    /// implemented yet — returns Ok without doing anything so callers
573    /// can run the same code on both platforms and rely on the post-
574    /// grace hard kill).
575    ///
576    /// Requires `ProcessConfig.create_process_group=true` on POSIX so
577    /// that `-pid` resolves to the child's own group. With the default
578    /// `create_process_group=false`, the kill would walk back to the
579    /// caller's group; the method silently no-ops in that case to avoid
580    /// signaling the wrong tree.
581    ///
582    /// Used by the daemon-side pipe sessions (#130 M4 follow-up) so
583    /// that `TerminationOutcome::SoftExit` becomes meaningful on POSIX.
584    pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
585        #[cfg(unix)]
586        {
587            if !self.config.create_process_group {
588                return Ok(());
589            }
590            let pid = match self.pid() {
591                Some(p) => p as i32,
592                None => return Err(ProcessError::NotRunning),
593            };
594            let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
595            if result != 0 {
596                let err = std::io::Error::last_os_error();
597                if err.raw_os_error() != Some(libc::ESRCH) {
598                    return Err(ProcessError::Io(err));
599                }
600            }
601            Ok(())
602        }
603        #[cfg(windows)]
604        {
605            if !self.config.create_process_group {
606                // GenerateConsoleCtrlEvent only routes to children
607                // spawned with CREATE_NEW_PROCESS_GROUP, and the
608                // event would otherwise hit the daemon's own group.
609                // No-op so the hard-kill schedule still wins.
610                return Ok(());
611            }
612            let pid = match self.pid() {
613                Some(p) => p,
614                None => return Err(ProcessError::NotRunning),
615            };
616            // GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT=1, pid).
617            // SAFETY: the FFI call is the standard Windows API; no
618            // borrowed Rust state is involved.
619            let ok = unsafe {
620                winapi::um::wincon::GenerateConsoleCtrlEvent(
621                    winapi::um::wincon::CTRL_BREAK_EVENT,
622                    pid,
623                )
624            };
625            if ok == 0 {
626                let err = std::io::Error::last_os_error();
627                // ERROR_INVALID_HANDLE means the child has already
628                // exited or has detached from the console — treat as
629                // success because the soft step's only goal is to
630                // give the child a chance to exit cleanly, and a
631                // dead/detached child does not need one.
632                if err.raw_os_error() != Some(6) {
633                    return Err(ProcessError::Io(err));
634                }
635            }
636            Ok(())
637        }
638    }
639
640    // Preserve a stable Rust frame here in release user dumps.
641    #[inline(never)]
642    /// Close the process wrapper by terminating the child when it is running.
643    pub fn close(&self) -> Result<(), ProcessError> {
644        public_symbols::rp_native_process_close_public(self)
645    }
646
647    fn close_impl(&self) -> Result<(), ProcessError> {
648        crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
649        if self.child.lock().expect("child mutex poisoned").is_none() {
650            return Ok(());
651        }
652        if self.poll()?.is_none() {
653            self.kill()?;
654        } else {
655            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
656        }
657        Ok(())
658    }
659
660    /// Return the child process id when the wrapper currently owns a child.
661    pub fn pid(&self) -> Option<u32> {
662        self.child
663            .lock()
664            .expect("child mutex poisoned")
665            .as_ref()
666            .map(|state| state.child.id())
667    }
668
669    /// Return the cached exit code when the child has exited.
670    pub fn returncode(&self) -> Option<i32> {
671        let v = self.shared.returncode.load(Ordering::Acquire);
672        if v == RETURNCODE_NOT_SET {
673            None
674        } else {
675            Some(v as i32)
676        }
677    }
678
679    /// Return whether captured output is queued for one stream.
680    pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
681        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
682            return false;
683        }
684        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
685        match stream {
686            StreamKind::Stdout => !guard.stdout_queue.is_empty(),
687            StreamKind::Stderr => !guard.stderr_queue.is_empty(),
688        }
689    }
690
691    /// Return whether captured combined output is queued.
692    pub fn has_pending_combined(&self) -> bool {
693        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
694        !guard.combined_queue.is_empty()
695    }
696
697    /// Drain and return all queued output for one stream.
698    pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
699        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
700            return Vec::new();
701        }
702        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
703        let queue = match stream {
704            StreamKind::Stdout => &mut guard.stdout_queue,
705            StreamKind::Stderr => &mut guard.stderr_queue,
706        };
707        queue.drain(..).collect()
708    }
709
710    /// Drain and return all queued combined output events.
711    pub fn drain_combined(&self) -> Vec<StreamEvent> {
712        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
713        guard.combined_queue.drain(..).collect()
714    }
715
716    /// Read the next captured chunk from one stream.
717    ///
718    /// Returns [`ReadStatus::Timeout`] when `timeout` elapses before output or
719    /// EOF is observed.
720    pub fn read_stream(
721        &self,
722        stream: StreamKind,
723        timeout: Option<Duration>,
724    ) -> ReadStatus<Vec<u8>> {
725        let deadline = timeout.map(|limit| Instant::now() + limit);
726        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
727
728        loop {
729            if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
730                return ReadStatus::Eof;
731            }
732
733            let queue = match stream {
734                StreamKind::Stdout => &mut guard.stdout_queue,
735                StreamKind::Stderr => &mut guard.stderr_queue,
736            };
737            if let Some(line) = queue.pop_front() {
738                return ReadStatus::Line(line);
739            }
740
741            let closed = match stream {
742                StreamKind::Stdout => {
743                    if self.config.stderr_mode == StderrMode::Stdout {
744                        guard.stdout_closed && guard.stderr_closed
745                    } else {
746                        guard.stdout_closed
747                    }
748                }
749                StreamKind::Stderr => guard.stderr_closed,
750            };
751            if closed {
752                return ReadStatus::Eof;
753            }
754
755            match deadline {
756                Some(deadline) => {
757                    let now = Instant::now();
758                    if now >= deadline {
759                        return ReadStatus::Timeout;
760                    }
761                    let wait = deadline.saturating_duration_since(now);
762                    let result = self
763                        .shared
764                        .condvar
765                        .wait_timeout(guard, wait)
766                        .expect("queue mutex poisoned");
767                    guard = result.0;
768                    if result.1.timed_out() {
769                        return ReadStatus::Timeout;
770                    }
771                }
772                None => {
773                    guard = self
774                        .shared
775                        .condvar
776                        .wait(guard)
777                        .expect("queue mutex poisoned");
778                }
779            }
780        }
781    }
782
783    // Preserve a stable Rust frame here in release user dumps.
784    #[inline(never)]
785    /// Read the next captured combined stream event.
786    pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
787        public_symbols::rp_native_process_read_combined_public(self, timeout)
788    }
789
790    fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
791        crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
792        let deadline = timeout.map(|limit| Instant::now() + limit);
793        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
794
795        loop {
796            if let Some(event) = guard.combined_queue.pop_front() {
797                return ReadStatus::Line(event);
798            }
799            if guard.stdout_closed && guard.stderr_closed {
800                return ReadStatus::Eof;
801            }
802
803            match deadline {
804                Some(deadline) => {
805                    let now = Instant::now();
806                    if now >= deadline {
807                        return ReadStatus::Timeout;
808                    }
809                    let wait = deadline.saturating_duration_since(now);
810                    let result = self
811                        .shared
812                        .condvar
813                        .wait_timeout(guard, wait)
814                        .expect("queue mutex poisoned");
815                    guard = result.0;
816                    if result.1.timed_out() {
817                        return ReadStatus::Timeout;
818                    }
819                }
820                None => {
821                    guard = self
822                        .shared
823                        .condvar
824                        .wait(guard)
825                        .expect("queue mutex poisoned");
826                }
827            }
828        }
829    }
830
831    /// Return the retained stdout history.
832    pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
833        self.shared
834            .queues
835            .lock()
836            .expect("queue mutex poisoned")
837            .stdout_history
838            .clone()
839            .into_iter()
840            .collect()
841    }
842
843    fn captured_stdout_raw(&self) -> Vec<u8> {
844        self.shared
845            .queues
846            .lock()
847            .expect("queue mutex poisoned")
848            .stdout_raw
849            .clone()
850    }
851
852    /// Return the retained stderr history.
853    pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
854        if self.config.stderr_mode == StderrMode::Stdout {
855            return Vec::new();
856        }
857        self.shared
858            .queues
859            .lock()
860            .expect("queue mutex poisoned")
861            .stderr_history
862            .clone()
863            .into_iter()
864            .collect()
865    }
866
867    fn captured_stderr_raw(&self) -> Vec<u8> {
868        if self.config.stderr_mode == StderrMode::Stdout {
869            return Vec::new();
870        }
871        self.shared
872            .queues
873            .lock()
874            .expect("queue mutex poisoned")
875            .stderr_raw
876            .clone()
877    }
878
879    /// Return the retained combined stdout/stderr event history.
880    pub fn captured_combined(&self) -> Vec<StreamEvent> {
881        self.shared
882            .queues
883            .lock()
884            .expect("queue mutex poisoned")
885            .combined_history
886            .clone()
887            .into_iter()
888            .collect()
889    }
890
891    /// Return the retained byte count for one captured stream.
892    pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
893        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
894            return 0;
895        }
896        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
897        match stream {
898            StreamKind::Stdout => guard.stdout_history_bytes,
899            StreamKind::Stderr => guard.stderr_history_bytes,
900        }
901    }
902
903    /// Return the retained byte count for combined captured output.
904    pub fn captured_combined_bytes(&self) -> usize {
905        self.shared
906            .queues
907            .lock()
908            .expect("queue mutex poisoned")
909            .combined_history_bytes
910    }
911
912    /// Clear retained output history for one stream and return freed bytes.
913    pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
914        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
915            return 0;
916        }
917        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
918        match stream {
919            StreamKind::Stdout => {
920                let released = guard.stdout_history_bytes;
921                guard.stdout_history.clear();
922                guard.stdout_raw.clear();
923                guard.stdout_history_bytes = 0;
924                released
925            }
926            StreamKind::Stderr => {
927                let released = guard.stderr_history_bytes;
928                guard.stderr_history.clear();
929                guard.stderr_raw.clear();
930                guard.stderr_history_bytes = 0;
931                released
932            }
933        }
934    }
935
936    /// Clear retained combined output history and return freed bytes.
937    pub fn clear_captured_combined(&self) -> usize {
938        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
939        let released = guard.combined_history_bytes;
940        guard.combined_history.clear();
941        guard.combined_history_bytes = 0;
942        released
943    }
944
945    fn build_command(&self) -> Command {
946        let mut command = match &self.config.command {
947            CommandSpec::Shell(command) => shell_command(command),
948            CommandSpec::Argv(argv) => {
949                let mut command = Command::new(&argv[0]);
950                if argv.len() > 1 {
951                    command.args(&argv[1..]);
952                }
953                command
954            }
955        };
956        if let Some(cwd) = &self.config.cwd {
957            command.current_dir(cwd);
958        }
959        if let Some(env) = &self.config.env {
960            command.env_clear();
961            command.envs(env.iter().map(|(k, v)| (k, v)));
962        }
963        #[cfg(windows)]
964        {
965            use std::os::windows::process::CommandExt;
966
967            // CREATE_NEW_PROCESS_GROUP makes GenerateConsoleCtrlEvent
968            // with CTRL_BREAK_EVENT route to this child's group
969            // (rather than the daemon's group) — required for the
970            // pipe-session soft-signal path on Windows (#130 M4).
971            const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
972            let extra = if self.config.create_process_group {
973                CREATE_NEW_PROCESS_GROUP
974            } else {
975                0
976            };
977            let flags = self.config.creationflags.unwrap_or(0)
978                | extra
979                | windows_priority_flags(self.config.nice);
980            if flags != 0 {
981                command.creation_flags(flags);
982            }
983        }
984        #[cfg(unix)]
985        {
986            let create_process_group = self.config.create_process_group;
987            let nice = self.config.nice;
988
989            if create_process_group || nice.is_some() {
990                use std::os::unix::process::CommandExt;
991
992                unsafe {
993                    command.pre_exec(move || {
994                        if create_process_group && libc::setpgid(0, 0) == -1 {
995                            return Err(std::io::Error::last_os_error());
996                        }
997                        if let Some(nice) = nice {
998                            let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
999                            if result == -1 {
1000                                return Err(std::io::Error::last_os_error());
1001                            }
1002                        }
1003                        Ok(())
1004                    });
1005                }
1006            }
1007        }
1008        command
1009    }
1010
1011    fn spawn_reader<R>(
1012        &self,
1013        pipe: R,
1014        source_stream: StreamKind,
1015        visible_stream: StreamKind,
1016        on_pipe_done: Box<dyn FnOnce() + Send>,
1017    ) where
1018        R: Read + Send + 'static,
1019    {
1020        let shared = Arc::clone(&self.shared);
1021        thread::spawn(move || {
1022            let mut reader = pipe;
1023            let mut chunk = vec![0_u8; 65536];
1024            let mut pending = Vec::new();
1025
1026            loop {
1027                match reader.read(&mut chunk) {
1028                    Ok(0) => break,
1029                    Ok(n) => {
1030                        append_raw(&shared, visible_stream, &chunk[..n]);
1031                        let lines = feed_chunk(&mut pending, &chunk[..n]);
1032                        emit_lines(&shared, visible_stream, lines);
1033                    }
1034                    Err(_) => break,
1035                }
1036            }
1037
1038            if !pending.is_empty() {
1039                emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
1040            }
1041
1042            // Clear the parent-side pipe-handle slot under its mutex
1043            // before dropping the reader. After this returns,
1044            // `kill_impl` can no longer try to `CancelIoEx` on us, so
1045            // it's safe for `reader`'s drop to close the HANDLE.
1046            on_pipe_done();
1047            drop(reader);
1048
1049            let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1050            match source_stream {
1051                StreamKind::Stdout => guard.stdout_closed = true,
1052                StreamKind::Stderr => guard.stderr_closed = true,
1053            }
1054            shared.condvar.notify_all();
1055        });
1056    }
1057
1058    #[cfg(windows)]
1059    fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
1060        let handles = Arc::clone(&self.capture_pipe_handles);
1061        Box::new(move || {
1062            let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
1063            match stream {
1064                StreamKind::Stdout => guard.stdout = None,
1065                StreamKind::Stderr => guard.stderr = None,
1066            }
1067        })
1068    }
1069
1070    #[cfg(not(windows))]
1071    fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
1072        Box::new(|| {})
1073    }
1074
1075    /// Cancel any pending blocking `read()` on the parent-side capture
1076    /// pipes so the reader threads' `read()` calls return
1077    /// `ERROR_OPERATION_ABORTED` immediately. Used by `kill_impl` to
1078    /// break the grandchild-orphan deadlock without waiting on
1079    /// `wait_for_capture_completion_with_deadline`'s safety-net.
1080    #[cfg(windows)]
1081    fn cancel_capture_io(&self) {
1082        crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
1083        use winapi::shared::ntdef::HANDLE;
1084        use winapi::um::ioapiset::CancelIoEx;
1085        let guard = self
1086            .capture_pipe_handles
1087            .lock()
1088            .expect("capture pipe handles mutex poisoned");
1089        if let Some(h) = guard.stdout {
1090            // SAFETY: the slot is `Some` only while the owning reader
1091            // thread still holds the `ChildStdout`, so the HANDLE is
1092            // valid for the duration of this call. The reader is
1093            // blocked in `lock()` on the same mutex if it's racing us
1094            // toward exit, so it cannot drop the pipe and close the
1095            // HANDLE until we return.
1096            unsafe {
1097                CancelIoEx(h as HANDLE, std::ptr::null_mut());
1098            }
1099        }
1100        if let Some(h) = guard.stderr {
1101            unsafe {
1102                CancelIoEx(h as HANDLE, std::ptr::null_mut());
1103            }
1104        }
1105    }
1106
1107    fn set_returncode(&self, code: i32) {
1108        self.shared.returncode.store(code as i64, Ordering::Release);
1109        self.shared.condvar.notify_all();
1110    }
1111
1112    fn wait_for_capture_completion_impl(&self) {
1113        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
1114        if !self.config.capture {
1115            return;
1116        }
1117
1118        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1119        while !(guard.stdout_closed && guard.stderr_closed) {
1120            guard = self
1121                .shared
1122                .condvar
1123                .wait(guard)
1124                .expect("queue mutex poisoned");
1125        }
1126    }
1127
1128    /// Like `wait_for_capture_completion_impl` but bounded by `deadline`.
1129    /// Returns `true` if the reader threads flipped both closed flags on
1130    /// their own, `false` if the deadline elapsed first. On timeout the
1131    /// closed flags are force-set (and waiters notified) so downstream
1132    /// pollers stop seeing `Timeout` and start seeing `Eof`. A reader
1133    /// thread that eventually unblocks after the OS releases the pipe
1134    /// will assign `closed = true` again, which is a harmless no-op.
1135    fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
1136        crate::rp_rust_debug_scope!(
1137            "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
1138        );
1139        if !self.config.capture {
1140            return true;
1141        }
1142
1143        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1144        while !(guard.stdout_closed && guard.stderr_closed) {
1145            let now = Instant::now();
1146            if now >= deadline {
1147                guard.stdout_closed = true;
1148                guard.stderr_closed = true;
1149                self.shared.condvar.notify_all();
1150                return false;
1151            }
1152            let (next_guard, result) = self
1153                .shared
1154                .condvar
1155                .wait_timeout(guard, deadline - now)
1156                .expect("queue mutex poisoned");
1157            guard = next_guard;
1158            if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
1159                guard.stdout_closed = true;
1160                guard.stderr_closed = true;
1161                self.shared.condvar.notify_all();
1162                return false;
1163            }
1164        }
1165        true
1166    }
1167}
1168
1169fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
1170    if lines.is_empty() {
1171        return;
1172    }
1173    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1174    for line in lines {
1175        let line_len = line.len();
1176        match stream {
1177            StreamKind::Stdout => {
1178                guard.stdout_history_bytes += line_len;
1179                guard.stdout_history.push_back(line.clone());
1180                guard.stdout_queue.push_back(line.clone());
1181            }
1182            StreamKind::Stderr => {
1183                guard.stderr_history_bytes += line_len;
1184                guard.stderr_history.push_back(line.clone());
1185                guard.stderr_queue.push_back(line.clone());
1186            }
1187        }
1188        let event = StreamEvent { stream, line };
1189        guard.combined_history_bytes += line_len;
1190        guard.combined_history.push_back(event.clone());
1191        guard.combined_queue.push_back(event);
1192    }
1193    shared.condvar.notify_all();
1194}
1195
1196fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1197    if chunk.is_empty() {
1198        return;
1199    }
1200    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1201    match stream {
1202        StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1203        StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1204    }
1205}
1206
1207/// Run a command to completion while concurrently draining stdout and stderr.
1208///
1209/// The helper forces capture on regardless of `config.capture`, returns raw
1210/// stdout/stderr bytes, and kills the child before returning
1211/// [`ProcessError::Timeout`] when `timeout` elapses.
1212pub fn run_command(
1213    mut config: ProcessConfig,
1214    timeout: Option<Duration>,
1215) -> Result<RunOutput, ProcessError> {
1216    config.capture = true;
1217    let process = NativeProcess::new(config);
1218    process.start()?;
1219
1220    let exit_code = match process.wait(timeout) {
1221        Ok(code) => code,
1222        Err(ProcessError::Timeout) => {
1223            match process.kill() {
1224                Ok(()) | Err(ProcessError::NotRunning) => {}
1225                Err(error) => return Err(error),
1226            }
1227            return Err(ProcessError::Timeout);
1228        }
1229        Err(error) => return Err(error),
1230    };
1231
1232    Ok(RunOutput {
1233        stdout: process.captured_stdout_raw(),
1234        stderr: process.captured_stderr_raw(),
1235        exit_code,
1236    })
1237}
1238
1239pub(crate) fn shell_command(command: &str) -> Command {
1240    #[cfg(windows)]
1241    {
1242        use std::os::windows::process::CommandExt;
1243
1244        let mut cmd = Command::new("cmd");
1245        cmd.raw_arg("/D /S /C \"");
1246        cmd.raw_arg(command);
1247        cmd.raw_arg("\"");
1248        cmd
1249    }
1250    #[cfg(not(windows))]
1251    {
1252        let mut cmd = Command::new("sh");
1253        cmd.arg("-lc").arg(command);
1254        cmd
1255    }
1256}
1257
1258#[cfg(test)]
1259mod tests;