Skip to main content

running_process/
lib.rs

1//! Cross-platform process execution, process-tree control, PTY handling, and
2//! broker integration primitives.
3//!
4//! The crate exposes a synchronous process API through [`NativeProcess`], a
5//! contained process-group helper through [`ContainedProcessGroup`], low-level
6//! spawn helpers through [`spawn()`] and [`spawn_daemon`], and optional
7//! daemon/broker modules behind feature flags.
8
9use std::collections::VecDeque;
10use std::io::Read;
11use std::process::{Child, Command, Stdio};
12use std::sync::atomic::{AtomicI64, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, Instant};
16
17pub mod console_detect;
18pub mod containment;
19mod helpers;
20#[cfg(feature = "originator-scan")]
21pub mod originator;
22// Wave 3+4 of #165: proto module + IPC client absorbed from the
23// former `running-process-proto` and `running-process-client` crates.
24// Both gated behind `feature = "client"`. The protobuf package
25// `running_process.daemon.v1` compiles to the file referenced below.
26#[cfg(feature = "client")]
27/// Prost-generated daemon protocol types used by the client transport.
28pub mod proto {
29    /// Generated Rust bindings for the `running_process.daemon.v1` protobuf package.
30    #[allow(missing_docs)]
31    pub mod daemon {
32        include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
33    }
34}
35
36#[cfg(feature = "client")]
37pub mod client;
38
39// Phase 0 of #228: v1 broker module — prost-generated wire types from
40// `proto/broker_v1_*.proto`. Gated behind `feature = "client"` because
41// prost itself is optional under that feature. Schemas are
42// FROZEN FOREVER once v1.0 ships.
43#[cfg(feature = "client")]
44pub mod broker;
45
46// Phase 1 of #228 (issue #230): maintenance subcommands exposed via
47// the `runpm` CLI. Currently just `release-handles` — a cross-platform
48// scaffold for the Windows worktree-teardown handle-race fix
49// (soldr#710). Gated behind `feature = "client"` because the CLI that
50// drives it is.
51#[cfg(feature = "client")]
52pub mod maintenance;
53
54#[cfg(feature = "client")]
55pub mod cleanup;
56
57// #415: consumer-consumable conformance test kit. Gated behind the
58// off-by-default `test-support` cargo feature (which implies `client`)
59// so the helpers ship in the published crate but only compile when a
60// consumer opts in as a dev-dependency.
61#[cfg(feature = "test-support")]
62pub mod test_support;
63
64// Lightweight tee sink primitives for callers that want transcript/log
65// fan-out without pulling in the full daemon runtime.
66#[cfg(feature = "telemetry")]
67#[path = "daemon/telemetry.rs"]
68pub mod telemetry;
69
70// Wave 5 of #165: daemon runtime absorbed from `running-process-daemon`.
71// Heavy deps (tokio, sqlite, etc.) gated behind `feature = "daemon"`.
72#[cfg(feature = "daemon")]
73/// Daemon runtime APIs and helpers enabled by the `daemon` feature.
74pub mod daemon;
75/// PTY-backed process APIs.
76pub mod pty;
77mod public_symbols;
78mod rust_debug;
79pub mod spawn;
80pub mod systemd_killmode;
81pub mod terminal_graphics;
82mod types;
83#[cfg(unix)]
84mod unix;
85#[cfg(windows)]
86mod windows;
87
88pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
89pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
90#[cfg(feature = "originator-scan")]
91pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
92pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
93pub use spawn::{
94    spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
95    StdioSource,
96};
97pub use terminal_graphics::{
98    current_terminal_capabilities, current_terminal_capabilities_with_timeout,
99    detect_terminal_capabilities, CapabilityStatus, EvidenceStrength, GraphicsCapability,
100    GraphicsProtocol, TerminalCapabilities, TerminalCapabilityInput, TerminalGraphicsCapabilities,
101    TerminalProbeEvidence,
102};
103pub use types::{
104    CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
105    StreamEvent, StreamKind,
106};
107
108pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
109#[cfg(unix)]
110pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
111#[cfg(windows)]
112pub(crate) use windows::{
113    assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
114    WindowsJobHandle,
115};
116
117#[macro_export]
118/// Create a scoped Rust debug trace label for the current function body.
119macro_rules! rp_rust_debug_scope {
120    ($label:expr) => {
121        let _running_process_rust_debug_scope =
122            $crate::RustDebugScopeGuard::enter($label, file!(), line!());
123    };
124}
125
126#[derive(Default)]
127struct QueueState {
128    stdout_queue: VecDeque<Vec<u8>>,
129    stderr_queue: VecDeque<Vec<u8>>,
130    combined_queue: VecDeque<StreamEvent>,
131    stdout_history: VecDeque<Vec<u8>>,
132    stderr_history: VecDeque<Vec<u8>>,
133    combined_history: VecDeque<StreamEvent>,
134    stdout_raw: Vec<u8>,
135    stderr_raw: Vec<u8>,
136    stdout_history_bytes: usize,
137    stderr_history_bytes: usize,
138    combined_history_bytes: usize,
139    stdout_closed: bool,
140    stderr_closed: bool,
141}
142
143/// Sentinel value for returncode atomic: process has not exited yet.
144const RETURNCODE_NOT_SET: i64 = i64::MIN;
145
146struct SharedState {
147    queues: Mutex<QueueState>,
148    condvar: Condvar,
149    /// Atomic exit code. `RETURNCODE_NOT_SET` means "not exited yet".
150    /// Updated by a background waiter thread — reading is lock-free.
151    returncode: AtomicI64,
152}
153
154struct ChildState {
155    child: Child,
156    #[cfg(windows)]
157    _job: WindowsJobHandle,
158}
159
160impl SharedState {
161    fn new(capture: bool) -> Self {
162        let queues = QueueState {
163            stdout_closed: !capture,
164            stderr_closed: !capture,
165            ..QueueState::default()
166        };
167        Self {
168            queues: Mutex::new(queues),
169            condvar: Condvar::new(),
170            returncode: AtomicI64::new(RETURNCODE_NOT_SET),
171        }
172    }
173}
174
175/// A cross-platform child process with optional output capture.
176///
177/// `NativeProcess` wraps [`std::process::Command`] with the crate's
178/// process-tree containment, capture draining, timeout, and terminal-control
179/// behavior. Methods are synchronous and are safe to call from ordinary
180/// blocking code.
181pub struct NativeProcess {
182    config: ProcessConfig,
183    child: Arc<Mutex<Option<ChildState>>>,
184    shared: Arc<SharedState>,
185    #[cfg(windows)]
186    capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
187}
188
189impl NativeProcess {
190    /// Create a process wrapper from a [`ProcessConfig`].
191    ///
192    /// The child is not spawned until [`Self::start`] is called.
193    pub fn new(config: ProcessConfig) -> Self {
194        Self {
195            shared: Arc::new(SharedState::new(config.capture)),
196            child: Arc::new(Mutex::new(None)),
197            config,
198            #[cfg(windows)]
199            capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
200        }
201    }
202
203    // Preserve a stable Rust frame here in release user dumps.
204    #[inline(never)]
205    /// Spawn the configured child process.
206    ///
207    /// Returns [`ProcessError::AlreadyStarted`] if the same wrapper already
208    /// owns a running child.
209    pub fn start(&self) -> Result<(), ProcessError> {
210        public_symbols::rp_native_process_start_public(self)
211    }
212
213    fn start_impl(&self) -> Result<(), ProcessError> {
214        crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
215        let mut guard = self.child.lock().expect("child mutex poisoned");
216        if guard.is_some() {
217            return Err(ProcessError::AlreadyStarted);
218        }
219
220        let mut command = self.build_command();
221        match self.config.stdin_mode {
222            StdinMode::Inherit => {}
223            StdinMode::Piped => {
224                command.stdin(Stdio::piped());
225            }
226            StdinMode::Null => {
227                command.stdin(Stdio::null());
228            }
229        }
230        if self.config.capture {
231            command.stdout(Stdio::piped());
232            command.stderr(Stdio::piped());
233        }
234
235        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
236        log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
237        #[cfg(windows)]
238        let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
239            .map_err(ProcessError::Spawn)?;
240        if self.config.capture {
241            let stdout = child.stdout.take().expect("stdout pipe missing");
242            let stderr = child.stderr.take().expect("stderr pipe missing");
243            #[cfg(windows)]
244            {
245                use std::os::windows::io::AsRawHandle;
246                let mut handles = self
247                    .capture_pipe_handles
248                    .lock()
249                    .expect("capture pipe handles mutex poisoned");
250                handles.stdout = Some(stdout.as_raw_handle() as usize);
251                handles.stderr = Some(stderr.as_raw_handle() as usize);
252            }
253            self.spawn_reader(
254                stdout,
255                StreamKind::Stdout,
256                StreamKind::Stdout,
257                self.pipe_done_callback(StreamKind::Stdout),
258            );
259            self.spawn_reader(
260                stderr,
261                StreamKind::Stderr,
262                match self.config.stderr_mode {
263                    StderrMode::Stdout => StreamKind::Stdout,
264                    StderrMode::Pipe => StreamKind::Stderr,
265                },
266                self.pipe_done_callback(StreamKind::Stderr),
267            );
268        }
269        *guard = Some(ChildState {
270            child,
271            #[cfg(windows)]
272            _job: job,
273        });
274        drop(guard);
275        self.spawn_exit_waiter();
276        Ok(())
277    }
278
279    /// Background thread that polls for process exit and stores the exit code
280    /// atomically. This makes `returncode` auto-update without explicit `poll()`.
281    fn spawn_exit_waiter(&self) {
282        let child = Arc::clone(&self.child);
283        let shared = Arc::clone(&self.shared);
284        thread::spawn(move || {
285            loop {
286                if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
287                    return;
288                }
289                {
290                    let mut guard = child.lock().expect("child mutex poisoned");
291                    if let Some(child_state) = guard.as_mut() {
292                        match child_state.child.try_wait() {
293                            Ok(Some(status)) => {
294                                let code = exit_code(status);
295                                shared.returncode.store(code as i64, Ordering::Release);
296                                shared.condvar.notify_all();
297                                return;
298                            }
299                            Ok(None) => {}
300                            Err(_) => return,
301                        }
302                    } else {
303                        return;
304                    }
305                }
306                // #199: intentional — capture thread polling for
307                // child-exit. `try_wait` is non-blocking by design;
308                // we can't block here because the thread also drains
309                // pipe state alongside the exit check. 10ms keeps the
310                // CPU cost negligible while staying responsive.
311                thread::sleep(Duration::from_millis(10));
312            }
313        });
314    }
315
316    /// Write bytes to the child's stdin and then close stdin.
317    pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
318        let mut guard = self.child.lock().expect("child mutex poisoned");
319        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
320        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
321        use std::io::Write;
322        stdin.write_all(data).map_err(ProcessError::Io)?;
323        stdin.flush().map_err(ProcessError::Io)?;
324        drop(child.stdin.take());
325        Ok(())
326    }
327
328    /// Write to the child's stdin without closing it afterwards, so the
329    /// caller can issue additional writes. Used by interactive
330    /// pipe-backed sessions (#130 milestone 3) where the daemon keeps
331    /// stdin open across multiple client input frames.
332    pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
333        let mut guard = self.child.lock().expect("child mutex poisoned");
334        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
335        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
336        use std::io::Write;
337        stdin.write_all(data).map_err(ProcessError::Io)?;
338        stdin.flush().map_err(ProcessError::Io)?;
339        Ok(())
340    }
341
342    /// Explicitly close the child's stdin (signals EOF to the child).
343    /// Idempotent: returns Ok if stdin was already closed.
344    pub fn close_stdin(&self) -> Result<(), ProcessError> {
345        let mut guard = self.child.lock().expect("child mutex poisoned");
346        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
347        drop(child.stdin.take());
348        Ok(())
349    }
350
351    /// Check whether the child has exited without blocking.
352    ///
353    /// Returns `Ok(None)` while the process is still running.
354    pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
355        // Fast path: check atomic set by background waiter thread.
356        if let Some(code) = self.returncode() {
357            return Ok(Some(code));
358        }
359        let mut guard = self.child.lock().expect("child mutex poisoned");
360        let Some(child_state) = guard.as_mut() else {
361            return Ok(self.returncode());
362        };
363        let child = &mut child_state.child;
364        let status = child.try_wait().map_err(ProcessError::Io)?;
365        if let Some(status) = status {
366            let code = exit_code(status);
367            self.set_returncode(code);
368            return Ok(Some(code));
369        }
370        Ok(None)
371    }
372
373    // Preserve a stable Rust frame here in release user dumps.
374    #[inline(never)]
375    /// Wait for the child to exit.
376    ///
377    /// When `timeout` is `Some`, returns [`ProcessError::Timeout`] if the
378    /// child does not exit before the duration elapses.
379    pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
380        public_symbols::rp_native_process_wait_public(self, timeout)
381    }
382
383    fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
384        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
385        if self.child.lock().expect("child mutex poisoned").is_none() {
386            return self.returncode().ok_or(ProcessError::NotRunning);
387        }
388        // Fast path: already exited.
389        if let Some(code) = self.returncode() {
390            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
391            return Ok(code);
392        }
393        let start = Instant::now();
394        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
395        loop {
396            // Check returncode (set by exit-waiter thread via atomic + condvar).
397            let rc = self.shared.returncode.load(Ordering::Acquire);
398            if rc != RETURNCODE_NOT_SET {
399                drop(guard);
400                let code = rc as i32;
401                public_symbols::rp_native_process_wait_for_capture_completion_public(self);
402                return Ok(code);
403            }
404            if let Some(limit) = timeout {
405                let elapsed = start.elapsed();
406                if elapsed >= limit {
407                    return Err(ProcessError::Timeout);
408                }
409                let remaining = limit - elapsed;
410                // Wait on condvar with timeout, capped at 50ms to recheck.
411                let wait_time = remaining.min(Duration::from_millis(50));
412                guard = self
413                    .shared
414                    .condvar
415                    .wait_timeout(guard, wait_time)
416                    .expect("queue mutex poisoned")
417                    .0;
418            } else {
419                // Wait on condvar with periodic recheck.
420                guard = self
421                    .shared
422                    .condvar
423                    .wait_timeout(guard, Duration::from_millis(50))
424                    .expect("queue mutex poisoned")
425                    .0;
426            }
427        }
428    }
429
430    // Preserve a stable Rust frame here in release user dumps.
431    #[inline(never)]
432    /// Forcefully terminate the child process.
433    pub fn kill(&self) -> Result<(), ProcessError> {
434        public_symbols::rp_native_process_kill_public(self)
435    }
436
437    fn kill_impl(&self) -> Result<(), ProcessError> {
438        crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
439        {
440            let mut guard = self.child.lock().expect("child mutex poisoned");
441            let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
442            child.kill().map_err(ProcessError::Io)?;
443            let status = child.wait().map_err(ProcessError::Io)?;
444            self.set_returncode(exit_code(status));
445        }
446        // On Windows, interrupt any pending blocking `read()` in the
447        // per-stream reader threads so they fall out of their loops
448        // immediately. This is what makes the grandchild-pipe-orphan
449        // case (FastLED Bug B: uv.exe spawns a python.exe grandchild
450        // that inherits the pipe and outlives uv) wake up in
451        // microseconds instead of waiting for the bounded-drain
452        // safety-net deadline below.
453        #[cfg(windows)]
454        self.cancel_capture_io();
455        // Synchronize with the per-stream reader threads so that by the
456        // time kill() returns, the capture queues have flipped from
457        // "blocked on read" to "closed" and downstream pollers (e.g.
458        // take_combined_line) observe EOS instead of timeout. Without
459        // this, callers that hit a wait()-timeout path see Python code
460        // raise TimeoutError, kill the child, then race the reader
461        // threads — a 10ms poll loop can miss the EOS flip entirely.
462        //
463        // The deadline is the safety-net: on Windows `CancelIoEx`
464        // above almost always wakes the readers first; on POSIX, and
465        // in any pathological Windows case where `CancelIoEx` doesn't
466        // fire, the deadline guarantees `kill()` still returns.
467        public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
468            self,
469            kill_drain_deadline(),
470        );
471        Ok(())
472    }
473
474    /// Terminate the child process.
475    ///
476    /// This currently uses the same hard-kill path as [`Self::kill`].
477    pub fn terminate(&self) -> Result<(), ProcessError> {
478        self.kill()
479    }
480
481    /// Send the OS-appropriate soft termination signal to the child's
482    /// process group (POSIX: SIGTERM to `-pid`; Windows: no soft path
483    /// implemented yet — returns Ok without doing anything so callers
484    /// can run the same code on both platforms and rely on the post-
485    /// grace hard kill).
486    ///
487    /// Requires `ProcessConfig.create_process_group=true` on POSIX so
488    /// that `-pid` resolves to the child's own group. With the default
489    /// `create_process_group=false`, the kill would walk back to the
490    /// caller's group; the method silently no-ops in that case to avoid
491    /// signaling the wrong tree.
492    ///
493    /// Used by the daemon-side pipe sessions (#130 M4 follow-up) so
494    /// that `TerminationOutcome::SoftExit` becomes meaningful on POSIX.
495    pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
496        #[cfg(unix)]
497        {
498            if !self.config.create_process_group {
499                return Ok(());
500            }
501            let pid = match self.pid() {
502                Some(p) => p as i32,
503                None => return Err(ProcessError::NotRunning),
504            };
505            let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
506            if result != 0 {
507                let err = std::io::Error::last_os_error();
508                if err.raw_os_error() != Some(libc::ESRCH) {
509                    return Err(ProcessError::Io(err));
510                }
511            }
512            Ok(())
513        }
514        #[cfg(windows)]
515        {
516            if !self.config.create_process_group {
517                // GenerateConsoleCtrlEvent only routes to children
518                // spawned with CREATE_NEW_PROCESS_GROUP, and the
519                // event would otherwise hit the daemon's own group.
520                // No-op so the hard-kill schedule still wins.
521                return Ok(());
522            }
523            let pid = match self.pid() {
524                Some(p) => p,
525                None => return Err(ProcessError::NotRunning),
526            };
527            // GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT=1, pid).
528            // SAFETY: the FFI call is the standard Windows API; no
529            // borrowed Rust state is involved.
530            let ok = unsafe {
531                winapi::um::wincon::GenerateConsoleCtrlEvent(
532                    winapi::um::wincon::CTRL_BREAK_EVENT,
533                    pid,
534                )
535            };
536            if ok == 0 {
537                let err = std::io::Error::last_os_error();
538                // ERROR_INVALID_HANDLE means the child has already
539                // exited or has detached from the console — treat as
540                // success because the soft step's only goal is to
541                // give the child a chance to exit cleanly, and a
542                // dead/detached child does not need one.
543                if err.raw_os_error() != Some(6) {
544                    return Err(ProcessError::Io(err));
545                }
546            }
547            Ok(())
548        }
549    }
550
551    // Preserve a stable Rust frame here in release user dumps.
552    #[inline(never)]
553    /// Close the process wrapper by terminating the child when it is running.
554    pub fn close(&self) -> Result<(), ProcessError> {
555        public_symbols::rp_native_process_close_public(self)
556    }
557
558    fn close_impl(&self) -> Result<(), ProcessError> {
559        crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
560        if self.child.lock().expect("child mutex poisoned").is_none() {
561            return Ok(());
562        }
563        if self.poll()?.is_none() {
564            self.kill()?;
565        } else {
566            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
567        }
568        Ok(())
569    }
570
571    /// Return the child process id when the wrapper currently owns a child.
572    pub fn pid(&self) -> Option<u32> {
573        self.child
574            .lock()
575            .expect("child mutex poisoned")
576            .as_ref()
577            .map(|state| state.child.id())
578    }
579
580    /// Return the cached exit code when the child has exited.
581    pub fn returncode(&self) -> Option<i32> {
582        let v = self.shared.returncode.load(Ordering::Acquire);
583        if v == RETURNCODE_NOT_SET {
584            None
585        } else {
586            Some(v as i32)
587        }
588    }
589
590    /// Return whether captured output is queued for one stream.
591    pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
592        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
593            return false;
594        }
595        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
596        match stream {
597            StreamKind::Stdout => !guard.stdout_queue.is_empty(),
598            StreamKind::Stderr => !guard.stderr_queue.is_empty(),
599        }
600    }
601
602    /// Return whether captured combined output is queued.
603    pub fn has_pending_combined(&self) -> bool {
604        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
605        !guard.combined_queue.is_empty()
606    }
607
608    /// Drain and return all queued output for one stream.
609    pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
610        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
611            return Vec::new();
612        }
613        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
614        let queue = match stream {
615            StreamKind::Stdout => &mut guard.stdout_queue,
616            StreamKind::Stderr => &mut guard.stderr_queue,
617        };
618        queue.drain(..).collect()
619    }
620
621    /// Drain and return all queued combined output events.
622    pub fn drain_combined(&self) -> Vec<StreamEvent> {
623        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
624        guard.combined_queue.drain(..).collect()
625    }
626
627    /// Read the next captured chunk from one stream.
628    ///
629    /// Returns [`ReadStatus::Timeout`] when `timeout` elapses before output or
630    /// EOF is observed.
631    pub fn read_stream(
632        &self,
633        stream: StreamKind,
634        timeout: Option<Duration>,
635    ) -> ReadStatus<Vec<u8>> {
636        let deadline = timeout.map(|limit| Instant::now() + limit);
637        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
638
639        loop {
640            if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
641                return ReadStatus::Eof;
642            }
643
644            let queue = match stream {
645                StreamKind::Stdout => &mut guard.stdout_queue,
646                StreamKind::Stderr => &mut guard.stderr_queue,
647            };
648            if let Some(line) = queue.pop_front() {
649                return ReadStatus::Line(line);
650            }
651
652            let closed = match stream {
653                StreamKind::Stdout => {
654                    if self.config.stderr_mode == StderrMode::Stdout {
655                        guard.stdout_closed && guard.stderr_closed
656                    } else {
657                        guard.stdout_closed
658                    }
659                }
660                StreamKind::Stderr => guard.stderr_closed,
661            };
662            if closed {
663                return ReadStatus::Eof;
664            }
665
666            match deadline {
667                Some(deadline) => {
668                    let now = Instant::now();
669                    if now >= deadline {
670                        return ReadStatus::Timeout;
671                    }
672                    let wait = deadline.saturating_duration_since(now);
673                    let result = self
674                        .shared
675                        .condvar
676                        .wait_timeout(guard, wait)
677                        .expect("queue mutex poisoned");
678                    guard = result.0;
679                    if result.1.timed_out() {
680                        return ReadStatus::Timeout;
681                    }
682                }
683                None => {
684                    guard = self
685                        .shared
686                        .condvar
687                        .wait(guard)
688                        .expect("queue mutex poisoned");
689                }
690            }
691        }
692    }
693
694    // Preserve a stable Rust frame here in release user dumps.
695    #[inline(never)]
696    /// Read the next captured combined stream event.
697    pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
698        public_symbols::rp_native_process_read_combined_public(self, timeout)
699    }
700
701    fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
702        crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
703        let deadline = timeout.map(|limit| Instant::now() + limit);
704        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
705
706        loop {
707            if let Some(event) = guard.combined_queue.pop_front() {
708                return ReadStatus::Line(event);
709            }
710            if guard.stdout_closed && guard.stderr_closed {
711                return ReadStatus::Eof;
712            }
713
714            match deadline {
715                Some(deadline) => {
716                    let now = Instant::now();
717                    if now >= deadline {
718                        return ReadStatus::Timeout;
719                    }
720                    let wait = deadline.saturating_duration_since(now);
721                    let result = self
722                        .shared
723                        .condvar
724                        .wait_timeout(guard, wait)
725                        .expect("queue mutex poisoned");
726                    guard = result.0;
727                    if result.1.timed_out() {
728                        return ReadStatus::Timeout;
729                    }
730                }
731                None => {
732                    guard = self
733                        .shared
734                        .condvar
735                        .wait(guard)
736                        .expect("queue mutex poisoned");
737                }
738            }
739        }
740    }
741
742    /// Return the retained stdout history.
743    pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
744        self.shared
745            .queues
746            .lock()
747            .expect("queue mutex poisoned")
748            .stdout_history
749            .clone()
750            .into_iter()
751            .collect()
752    }
753
754    fn captured_stdout_raw(&self) -> Vec<u8> {
755        self.shared
756            .queues
757            .lock()
758            .expect("queue mutex poisoned")
759            .stdout_raw
760            .clone()
761    }
762
763    /// Return the retained stderr history.
764    pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
765        if self.config.stderr_mode == StderrMode::Stdout {
766            return Vec::new();
767        }
768        self.shared
769            .queues
770            .lock()
771            .expect("queue mutex poisoned")
772            .stderr_history
773            .clone()
774            .into_iter()
775            .collect()
776    }
777
778    fn captured_stderr_raw(&self) -> Vec<u8> {
779        if self.config.stderr_mode == StderrMode::Stdout {
780            return Vec::new();
781        }
782        self.shared
783            .queues
784            .lock()
785            .expect("queue mutex poisoned")
786            .stderr_raw
787            .clone()
788    }
789
790    /// Return the retained combined stdout/stderr event history.
791    pub fn captured_combined(&self) -> Vec<StreamEvent> {
792        self.shared
793            .queues
794            .lock()
795            .expect("queue mutex poisoned")
796            .combined_history
797            .clone()
798            .into_iter()
799            .collect()
800    }
801
802    /// Return the retained byte count for one captured stream.
803    pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
804        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
805            return 0;
806        }
807        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
808        match stream {
809            StreamKind::Stdout => guard.stdout_history_bytes,
810            StreamKind::Stderr => guard.stderr_history_bytes,
811        }
812    }
813
814    /// Return the retained byte count for combined captured output.
815    pub fn captured_combined_bytes(&self) -> usize {
816        self.shared
817            .queues
818            .lock()
819            .expect("queue mutex poisoned")
820            .combined_history_bytes
821    }
822
823    /// Clear retained output history for one stream and return freed bytes.
824    pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
825        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
826            return 0;
827        }
828        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
829        match stream {
830            StreamKind::Stdout => {
831                let released = guard.stdout_history_bytes;
832                guard.stdout_history.clear();
833                guard.stdout_raw.clear();
834                guard.stdout_history_bytes = 0;
835                released
836            }
837            StreamKind::Stderr => {
838                let released = guard.stderr_history_bytes;
839                guard.stderr_history.clear();
840                guard.stderr_raw.clear();
841                guard.stderr_history_bytes = 0;
842                released
843            }
844        }
845    }
846
847    /// Clear retained combined output history and return freed bytes.
848    pub fn clear_captured_combined(&self) -> usize {
849        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
850        let released = guard.combined_history_bytes;
851        guard.combined_history.clear();
852        guard.combined_history_bytes = 0;
853        released
854    }
855
856    fn build_command(&self) -> Command {
857        let mut command = match &self.config.command {
858            CommandSpec::Shell(command) => shell_command(command),
859            CommandSpec::Argv(argv) => {
860                let mut command = Command::new(&argv[0]);
861                if argv.len() > 1 {
862                    command.args(&argv[1..]);
863                }
864                command
865            }
866        };
867        if let Some(cwd) = &self.config.cwd {
868            command.current_dir(cwd);
869        }
870        if let Some(env) = &self.config.env {
871            command.env_clear();
872            command.envs(env.iter().map(|(k, v)| (k, v)));
873        }
874        #[cfg(windows)]
875        {
876            use std::os::windows::process::CommandExt;
877
878            // CREATE_NEW_PROCESS_GROUP makes GenerateConsoleCtrlEvent
879            // with CTRL_BREAK_EVENT route to this child's group
880            // (rather than the daemon's group) — required for the
881            // pipe-session soft-signal path on Windows (#130 M4).
882            const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
883            let extra = if self.config.create_process_group {
884                CREATE_NEW_PROCESS_GROUP
885            } else {
886                0
887            };
888            let flags = self.config.creationflags.unwrap_or(0)
889                | extra
890                | windows_priority_flags(self.config.nice);
891            if flags != 0 {
892                command.creation_flags(flags);
893            }
894        }
895        #[cfg(unix)]
896        {
897            let create_process_group = self.config.create_process_group;
898            let nice = self.config.nice;
899
900            if create_process_group || nice.is_some() {
901                use std::os::unix::process::CommandExt;
902
903                unsafe {
904                    command.pre_exec(move || {
905                        if create_process_group && libc::setpgid(0, 0) == -1 {
906                            return Err(std::io::Error::last_os_error());
907                        }
908                        if let Some(nice) = nice {
909                            let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
910                            if result == -1 {
911                                return Err(std::io::Error::last_os_error());
912                            }
913                        }
914                        Ok(())
915                    });
916                }
917            }
918        }
919        command
920    }
921
922    fn spawn_reader<R>(
923        &self,
924        pipe: R,
925        source_stream: StreamKind,
926        visible_stream: StreamKind,
927        on_pipe_done: Box<dyn FnOnce() + Send>,
928    ) where
929        R: Read + Send + 'static,
930    {
931        let shared = Arc::clone(&self.shared);
932        thread::spawn(move || {
933            let mut reader = pipe;
934            let mut chunk = vec![0_u8; 65536];
935            let mut pending = Vec::new();
936
937            loop {
938                match reader.read(&mut chunk) {
939                    Ok(0) => break,
940                    Ok(n) => {
941                        append_raw(&shared, visible_stream, &chunk[..n]);
942                        let lines = feed_chunk(&mut pending, &chunk[..n]);
943                        emit_lines(&shared, visible_stream, lines);
944                    }
945                    Err(_) => break,
946                }
947            }
948
949            if !pending.is_empty() {
950                emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
951            }
952
953            // Clear the parent-side pipe-handle slot under its mutex
954            // before dropping the reader. After this returns,
955            // `kill_impl` can no longer try to `CancelIoEx` on us, so
956            // it's safe for `reader`'s drop to close the HANDLE.
957            on_pipe_done();
958            drop(reader);
959
960            let mut guard = shared.queues.lock().expect("queue mutex poisoned");
961            match source_stream {
962                StreamKind::Stdout => guard.stdout_closed = true,
963                StreamKind::Stderr => guard.stderr_closed = true,
964            }
965            shared.condvar.notify_all();
966        });
967    }
968
969    #[cfg(windows)]
970    fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
971        let handles = Arc::clone(&self.capture_pipe_handles);
972        Box::new(move || {
973            let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
974            match stream {
975                StreamKind::Stdout => guard.stdout = None,
976                StreamKind::Stderr => guard.stderr = None,
977            }
978        })
979    }
980
981    #[cfg(not(windows))]
982    fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
983        Box::new(|| {})
984    }
985
986    /// Cancel any pending blocking `read()` on the parent-side capture
987    /// pipes so the reader threads' `read()` calls return
988    /// `ERROR_OPERATION_ABORTED` immediately. Used by `kill_impl` to
989    /// break the grandchild-orphan deadlock without waiting on
990    /// `wait_for_capture_completion_with_deadline`'s safety-net.
991    #[cfg(windows)]
992    fn cancel_capture_io(&self) {
993        crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
994        use winapi::shared::ntdef::HANDLE;
995        use winapi::um::ioapiset::CancelIoEx;
996        let guard = self
997            .capture_pipe_handles
998            .lock()
999            .expect("capture pipe handles mutex poisoned");
1000        if let Some(h) = guard.stdout {
1001            // SAFETY: the slot is `Some` only while the owning reader
1002            // thread still holds the `ChildStdout`, so the HANDLE is
1003            // valid for the duration of this call. The reader is
1004            // blocked in `lock()` on the same mutex if it's racing us
1005            // toward exit, so it cannot drop the pipe and close the
1006            // HANDLE until we return.
1007            unsafe {
1008                CancelIoEx(h as HANDLE, std::ptr::null_mut());
1009            }
1010        }
1011        if let Some(h) = guard.stderr {
1012            unsafe {
1013                CancelIoEx(h as HANDLE, std::ptr::null_mut());
1014            }
1015        }
1016    }
1017
1018    fn set_returncode(&self, code: i32) {
1019        self.shared.returncode.store(code as i64, Ordering::Release);
1020        self.shared.condvar.notify_all();
1021    }
1022
1023    fn wait_for_capture_completion_impl(&self) {
1024        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
1025        if !self.config.capture {
1026            return;
1027        }
1028
1029        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1030        while !(guard.stdout_closed && guard.stderr_closed) {
1031            guard = self
1032                .shared
1033                .condvar
1034                .wait(guard)
1035                .expect("queue mutex poisoned");
1036        }
1037    }
1038
1039    /// Like `wait_for_capture_completion_impl` but bounded by `deadline`.
1040    /// Returns `true` if the reader threads flipped both closed flags on
1041    /// their own, `false` if the deadline elapsed first. On timeout the
1042    /// closed flags are force-set (and waiters notified) so downstream
1043    /// pollers stop seeing `Timeout` and start seeing `Eof`. A reader
1044    /// thread that eventually unblocks after the OS releases the pipe
1045    /// will assign `closed = true` again, which is a harmless no-op.
1046    fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
1047        crate::rp_rust_debug_scope!(
1048            "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
1049        );
1050        if !self.config.capture {
1051            return true;
1052        }
1053
1054        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1055        while !(guard.stdout_closed && guard.stderr_closed) {
1056            let now = Instant::now();
1057            if now >= deadline {
1058                guard.stdout_closed = true;
1059                guard.stderr_closed = true;
1060                self.shared.condvar.notify_all();
1061                return false;
1062            }
1063            let (next_guard, result) = self
1064                .shared
1065                .condvar
1066                .wait_timeout(guard, deadline - now)
1067                .expect("queue mutex poisoned");
1068            guard = next_guard;
1069            if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
1070                guard.stdout_closed = true;
1071                guard.stderr_closed = true;
1072                self.shared.condvar.notify_all();
1073                return false;
1074            }
1075        }
1076        true
1077    }
1078}
1079
1080fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
1081    if lines.is_empty() {
1082        return;
1083    }
1084    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1085    for line in lines {
1086        let line_len = line.len();
1087        match stream {
1088            StreamKind::Stdout => {
1089                guard.stdout_history_bytes += line_len;
1090                guard.stdout_history.push_back(line.clone());
1091                guard.stdout_queue.push_back(line.clone());
1092            }
1093            StreamKind::Stderr => {
1094                guard.stderr_history_bytes += line_len;
1095                guard.stderr_history.push_back(line.clone());
1096                guard.stderr_queue.push_back(line.clone());
1097            }
1098        }
1099        let event = StreamEvent { stream, line };
1100        guard.combined_history_bytes += line_len;
1101        guard.combined_history.push_back(event.clone());
1102        guard.combined_queue.push_back(event);
1103    }
1104    shared.condvar.notify_all();
1105}
1106
1107fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1108    if chunk.is_empty() {
1109        return;
1110    }
1111    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1112    match stream {
1113        StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1114        StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1115    }
1116}
1117
1118/// Run a command to completion while concurrently draining stdout and stderr.
1119///
1120/// The helper forces capture on regardless of `config.capture`, returns raw
1121/// stdout/stderr bytes, and kills the child before returning
1122/// [`ProcessError::Timeout`] when `timeout` elapses.
1123pub fn run_command(
1124    mut config: ProcessConfig,
1125    timeout: Option<Duration>,
1126) -> Result<RunOutput, ProcessError> {
1127    config.capture = true;
1128    let process = NativeProcess::new(config);
1129    process.start()?;
1130
1131    let exit_code = match process.wait(timeout) {
1132        Ok(code) => code,
1133        Err(ProcessError::Timeout) => {
1134            match process.kill() {
1135                Ok(()) | Err(ProcessError::NotRunning) => {}
1136                Err(error) => return Err(error),
1137            }
1138            return Err(ProcessError::Timeout);
1139        }
1140        Err(error) => return Err(error),
1141    };
1142
1143    Ok(RunOutput {
1144        stdout: process.captured_stdout_raw(),
1145        stderr: process.captured_stderr_raw(),
1146        exit_code,
1147    })
1148}
1149
1150pub(crate) fn shell_command(command: &str) -> Command {
1151    #[cfg(windows)]
1152    {
1153        use std::os::windows::process::CommandExt;
1154
1155        let mut cmd = Command::new("cmd");
1156        cmd.raw_arg("/D /S /C \"");
1157        cmd.raw_arg(command);
1158        cmd.raw_arg("\"");
1159        cmd
1160    }
1161    #[cfg(not(windows))]
1162    {
1163        let mut cmd = Command::new("sh");
1164        cmd.arg("-lc").arg(command);
1165        cmd
1166    }
1167}
1168
1169#[cfg(test)]
1170mod tests;