Skip to main content

running_process/
lib.rs

1//! Cross-platform process execution, process-tree control, PTY handling, and
2//! broker integration primitives.
3//!
4//! The crate exposes a synchronous process API through [`NativeProcess`], a
5//! contained process-group helper through [`ContainedProcessGroup`], low-level
6//! spawn helpers through [`spawn()`] and [`spawn_daemon`], and optional
7//! daemon/broker modules behind feature flags.
8
9use std::collections::VecDeque;
10use std::io::Read;
11use std::process::{Child, Command, Stdio};
12use std::sync::atomic::{AtomicI64, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, Instant};
16
17pub mod console_detect;
18pub mod containment;
19mod helpers;
20#[cfg(feature = "originator-scan")]
21pub mod originator;
22// Wave 3+4 of #165: proto module + IPC client absorbed from the
23// former `running-process-proto` and `running-process-client` crates.
24// Both gated behind `feature = "client"`. The protobuf package
25// `running_process.daemon.v1` compiles to the file referenced below.
26#[cfg(feature = "client")]
27/// Prost-generated daemon protocol types used by the client transport.
28pub mod proto {
29    /// Generated Rust bindings for the `running_process.daemon.v1` protobuf package.
30    #[allow(missing_docs)]
31    pub mod daemon {
32        include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
33    }
34}
35
36#[cfg(feature = "client")]
37pub mod client;
38
39// Phase 0 of #228: v1 broker module — prost-generated wire types from
40// `proto/broker_v1_*.proto`. Gated behind `feature = "client"` because
41// prost itself is optional under that feature. Schemas are
42// FROZEN FOREVER once v1.0 ships.
43#[cfg(feature = "client")]
44pub mod broker;
45
46// Phase 1 of #228 (issue #230): maintenance subcommands exposed via
47// the `runpm` CLI. Currently just `release-handles` — a cross-platform
48// scaffold for the Windows worktree-teardown handle-race fix
49// (soldr#710). Gated behind `feature = "client"` because the CLI that
50// drives it is.
51#[cfg(feature = "client")]
52pub mod maintenance;
53
54#[cfg(feature = "client")]
55pub mod cleanup;
56
57// Lightweight tee sink primitives for callers that want transcript/log
58// fan-out without pulling in the full daemon runtime.
59#[cfg(feature = "telemetry")]
60#[path = "daemon/telemetry.rs"]
61pub mod telemetry;
62
63// Wave 5 of #165: daemon runtime absorbed from `running-process-daemon`.
64// Heavy deps (tokio, sqlite, etc.) gated behind `feature = "daemon"`.
65#[cfg(feature = "daemon")]
66/// Daemon runtime APIs and helpers enabled by the `daemon` feature.
67pub mod daemon;
68/// PTY-backed process APIs.
69pub mod pty;
70mod public_symbols;
71mod rust_debug;
72pub mod spawn;
73pub mod terminal_graphics;
74mod types;
75#[cfg(unix)]
76mod unix;
77#[cfg(windows)]
78mod windows;
79
80pub use console_detect::{ConsoleWindowInfo, monitor_console_windows};
81pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
82#[cfg(feature = "originator-scan")]
83pub use originator::{OriginatorProcessInfo, find_processes_by_originator};
84pub use rust_debug::{RustDebugScopeGuard, render_rust_debug_traces};
85pub use spawn::{
86    DaemonChild, SpawnStdio, SpawnedChild, StdioSource, spawn, spawn_daemon,
87    spawn_daemon_with_clear_env,
88};
89pub use terminal_graphics::{
90    CapabilityStatus, EvidenceStrength, GraphicsCapability, GraphicsProtocol, TerminalCapabilities,
91    TerminalCapabilityInput, TerminalGraphicsCapabilities, TerminalProbeEvidence,
92    current_terminal_capabilities, current_terminal_capabilities_with_timeout,
93    detect_terminal_capabilities,
94};
95pub use types::{
96    CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
97    StreamEvent, StreamKind,
98};
99
100pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
101#[cfg(unix)]
102pub use unix::{UnixSignal, unix_set_priority, unix_signal_process, unix_signal_process_group};
103#[cfg(windows)]
104pub(crate) use windows::{
105    CapturePipeHandles, WindowsJobHandle, assign_child_to_windows_kill_on_close_job_impl,
106    windows_priority_flags,
107};
108
109#[macro_export]
110/// Create a scoped Rust debug trace label for the current function body.
111macro_rules! rp_rust_debug_scope {
112    ($label:expr) => {
113        let _running_process_rust_debug_scope =
114            $crate::RustDebugScopeGuard::enter($label, file!(), line!());
115    };
116}
117
118#[derive(Default)]
119struct QueueState {
120    stdout_queue: VecDeque<Vec<u8>>,
121    stderr_queue: VecDeque<Vec<u8>>,
122    combined_queue: VecDeque<StreamEvent>,
123    stdout_history: VecDeque<Vec<u8>>,
124    stderr_history: VecDeque<Vec<u8>>,
125    combined_history: VecDeque<StreamEvent>,
126    stdout_raw: Vec<u8>,
127    stderr_raw: Vec<u8>,
128    stdout_history_bytes: usize,
129    stderr_history_bytes: usize,
130    combined_history_bytes: usize,
131    stdout_closed: bool,
132    stderr_closed: bool,
133}
134
135/// Sentinel value for returncode atomic: process has not exited yet.
136const RETURNCODE_NOT_SET: i64 = i64::MIN;
137
138struct SharedState {
139    queues: Mutex<QueueState>,
140    condvar: Condvar,
141    /// Atomic exit code. `RETURNCODE_NOT_SET` means "not exited yet".
142    /// Updated by a background waiter thread — reading is lock-free.
143    returncode: AtomicI64,
144}
145
146struct ChildState {
147    child: Child,
148    #[cfg(windows)]
149    _job: WindowsJobHandle,
150}
151
152impl SharedState {
153    fn new(capture: bool) -> Self {
154        let queues = QueueState {
155            stdout_closed: !capture,
156            stderr_closed: !capture,
157            ..QueueState::default()
158        };
159        Self {
160            queues: Mutex::new(queues),
161            condvar: Condvar::new(),
162            returncode: AtomicI64::new(RETURNCODE_NOT_SET),
163        }
164    }
165}
166
167/// A cross-platform child process with optional output capture.
168///
169/// `NativeProcess` wraps [`std::process::Command`] with the crate's
170/// process-tree containment, capture draining, timeout, and terminal-control
171/// behavior. Methods are synchronous and are safe to call from ordinary
172/// blocking code.
173pub struct NativeProcess {
174    config: ProcessConfig,
175    child: Arc<Mutex<Option<ChildState>>>,
176    shared: Arc<SharedState>,
177    #[cfg(windows)]
178    capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
179}
180
181impl NativeProcess {
182    /// Create a process wrapper from a [`ProcessConfig`].
183    ///
184    /// The child is not spawned until [`Self::start`] is called.
185    pub fn new(config: ProcessConfig) -> Self {
186        Self {
187            shared: Arc::new(SharedState::new(config.capture)),
188            child: Arc::new(Mutex::new(None)),
189            config,
190            #[cfg(windows)]
191            capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
192        }
193    }
194
195    // Preserve a stable Rust frame here in release user dumps.
196    #[inline(never)]
197    /// Spawn the configured child process.
198    ///
199    /// Returns [`ProcessError::AlreadyStarted`] if the same wrapper already
200    /// owns a running child.
201    pub fn start(&self) -> Result<(), ProcessError> {
202        public_symbols::rp_native_process_start_public(self)
203    }
204
205    fn start_impl(&self) -> Result<(), ProcessError> {
206        crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
207        let mut guard = self.child.lock().expect("child mutex poisoned");
208        if guard.is_some() {
209            return Err(ProcessError::AlreadyStarted);
210        }
211
212        let mut command = self.build_command();
213        match self.config.stdin_mode {
214            StdinMode::Inherit => {}
215            StdinMode::Piped => {
216                command.stdin(Stdio::piped());
217            }
218            StdinMode::Null => {
219                command.stdin(Stdio::null());
220            }
221        }
222        if self.config.capture {
223            command.stdout(Stdio::piped());
224            command.stderr(Stdio::piped());
225        }
226
227        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
228        log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
229        #[cfg(windows)]
230        let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
231            .map_err(ProcessError::Spawn)?;
232        if self.config.capture {
233            let stdout = child.stdout.take().expect("stdout pipe missing");
234            let stderr = child.stderr.take().expect("stderr pipe missing");
235            #[cfg(windows)]
236            {
237                use std::os::windows::io::AsRawHandle;
238                let mut handles = self
239                    .capture_pipe_handles
240                    .lock()
241                    .expect("capture pipe handles mutex poisoned");
242                handles.stdout = Some(stdout.as_raw_handle() as usize);
243                handles.stderr = Some(stderr.as_raw_handle() as usize);
244            }
245            self.spawn_reader(
246                stdout,
247                StreamKind::Stdout,
248                StreamKind::Stdout,
249                self.pipe_done_callback(StreamKind::Stdout),
250            );
251            self.spawn_reader(
252                stderr,
253                StreamKind::Stderr,
254                match self.config.stderr_mode {
255                    StderrMode::Stdout => StreamKind::Stdout,
256                    StderrMode::Pipe => StreamKind::Stderr,
257                },
258                self.pipe_done_callback(StreamKind::Stderr),
259            );
260        }
261        *guard = Some(ChildState {
262            child,
263            #[cfg(windows)]
264            _job: job,
265        });
266        drop(guard);
267        self.spawn_exit_waiter();
268        Ok(())
269    }
270
271    /// Background thread that polls for process exit and stores the exit code
272    /// atomically. This makes `returncode` auto-update without explicit `poll()`.
273    fn spawn_exit_waiter(&self) {
274        let child = Arc::clone(&self.child);
275        let shared = Arc::clone(&self.shared);
276        thread::spawn(move || {
277            loop {
278                if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
279                    return;
280                }
281                {
282                    let mut guard = child.lock().expect("child mutex poisoned");
283                    if let Some(child_state) = guard.as_mut() {
284                        match child_state.child.try_wait() {
285                            Ok(Some(status)) => {
286                                let code = exit_code(status);
287                                shared.returncode.store(code as i64, Ordering::Release);
288                                shared.condvar.notify_all();
289                                return;
290                            }
291                            Ok(None) => {}
292                            Err(_) => return,
293                        }
294                    } else {
295                        return;
296                    }
297                }
298                // #199: intentional — capture thread polling for
299                // child-exit. `try_wait` is non-blocking by design;
300                // we can't block here because the thread also drains
301                // pipe state alongside the exit check. 10ms keeps the
302                // CPU cost negligible while staying responsive.
303                thread::sleep(Duration::from_millis(10));
304            }
305        });
306    }
307
308    /// Write bytes to the child's stdin and then close stdin.
309    pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
310        let mut guard = self.child.lock().expect("child mutex poisoned");
311        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
312        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
313        use std::io::Write;
314        stdin.write_all(data).map_err(ProcessError::Io)?;
315        stdin.flush().map_err(ProcessError::Io)?;
316        drop(child.stdin.take());
317        Ok(())
318    }
319
320    /// Write to the child's stdin without closing it afterwards, so the
321    /// caller can issue additional writes. Used by interactive
322    /// pipe-backed sessions (#130 milestone 3) where the daemon keeps
323    /// stdin open across multiple client input frames.
324    pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
325        let mut guard = self.child.lock().expect("child mutex poisoned");
326        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
327        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
328        use std::io::Write;
329        stdin.write_all(data).map_err(ProcessError::Io)?;
330        stdin.flush().map_err(ProcessError::Io)?;
331        Ok(())
332    }
333
334    /// Explicitly close the child's stdin (signals EOF to the child).
335    /// Idempotent: returns Ok if stdin was already closed.
336    pub fn close_stdin(&self) -> Result<(), ProcessError> {
337        let mut guard = self.child.lock().expect("child mutex poisoned");
338        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
339        drop(child.stdin.take());
340        Ok(())
341    }
342
343    /// Check whether the child has exited without blocking.
344    ///
345    /// Returns `Ok(None)` while the process is still running.
346    pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
347        // Fast path: check atomic set by background waiter thread.
348        if let Some(code) = self.returncode() {
349            return Ok(Some(code));
350        }
351        let mut guard = self.child.lock().expect("child mutex poisoned");
352        let Some(child_state) = guard.as_mut() else {
353            return Ok(self.returncode());
354        };
355        let child = &mut child_state.child;
356        let status = child.try_wait().map_err(ProcessError::Io)?;
357        if let Some(status) = status {
358            let code = exit_code(status);
359            self.set_returncode(code);
360            return Ok(Some(code));
361        }
362        Ok(None)
363    }
364
365    // Preserve a stable Rust frame here in release user dumps.
366    #[inline(never)]
367    /// Wait for the child to exit.
368    ///
369    /// When `timeout` is `Some`, returns [`ProcessError::Timeout`] if the
370    /// child does not exit before the duration elapses.
371    pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
372        public_symbols::rp_native_process_wait_public(self, timeout)
373    }
374
375    fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
376        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
377        if self.child.lock().expect("child mutex poisoned").is_none() {
378            return self.returncode().ok_or(ProcessError::NotRunning);
379        }
380        // Fast path: already exited.
381        if let Some(code) = self.returncode() {
382            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
383            return Ok(code);
384        }
385        let start = Instant::now();
386        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
387        loop {
388            // Check returncode (set by exit-waiter thread via atomic + condvar).
389            let rc = self.shared.returncode.load(Ordering::Acquire);
390            if rc != RETURNCODE_NOT_SET {
391                drop(guard);
392                let code = rc as i32;
393                public_symbols::rp_native_process_wait_for_capture_completion_public(self);
394                return Ok(code);
395            }
396            if let Some(limit) = timeout {
397                let elapsed = start.elapsed();
398                if elapsed >= limit {
399                    return Err(ProcessError::Timeout);
400                }
401                let remaining = limit - elapsed;
402                // Wait on condvar with timeout, capped at 50ms to recheck.
403                let wait_time = remaining.min(Duration::from_millis(50));
404                guard = self
405                    .shared
406                    .condvar
407                    .wait_timeout(guard, wait_time)
408                    .expect("queue mutex poisoned")
409                    .0;
410            } else {
411                // Wait on condvar with periodic recheck.
412                guard = self
413                    .shared
414                    .condvar
415                    .wait_timeout(guard, Duration::from_millis(50))
416                    .expect("queue mutex poisoned")
417                    .0;
418            }
419        }
420    }
421
422    // Preserve a stable Rust frame here in release user dumps.
423    #[inline(never)]
424    /// Forcefully terminate the child process.
425    pub fn kill(&self) -> Result<(), ProcessError> {
426        public_symbols::rp_native_process_kill_public(self)
427    }
428
429    fn kill_impl(&self) -> Result<(), ProcessError> {
430        crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
431        {
432            let mut guard = self.child.lock().expect("child mutex poisoned");
433            let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
434            child.kill().map_err(ProcessError::Io)?;
435            let status = child.wait().map_err(ProcessError::Io)?;
436            self.set_returncode(exit_code(status));
437        }
438        // On Windows, interrupt any pending blocking `read()` in the
439        // per-stream reader threads so they fall out of their loops
440        // immediately. This is what makes the grandchild-pipe-orphan
441        // case (FastLED Bug B: uv.exe spawns a python.exe grandchild
442        // that inherits the pipe and outlives uv) wake up in
443        // microseconds instead of waiting for the bounded-drain
444        // safety-net deadline below.
445        #[cfg(windows)]
446        self.cancel_capture_io();
447        // Synchronize with the per-stream reader threads so that by the
448        // time kill() returns, the capture queues have flipped from
449        // "blocked on read" to "closed" and downstream pollers (e.g.
450        // take_combined_line) observe EOS instead of timeout. Without
451        // this, callers that hit a wait()-timeout path see Python code
452        // raise TimeoutError, kill the child, then race the reader
453        // threads — a 10ms poll loop can miss the EOS flip entirely.
454        //
455        // The deadline is the safety-net: on Windows `CancelIoEx`
456        // above almost always wakes the readers first; on POSIX, and
457        // in any pathological Windows case where `CancelIoEx` doesn't
458        // fire, the deadline guarantees `kill()` still returns.
459        public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
460            self,
461            kill_drain_deadline(),
462        );
463        Ok(())
464    }
465
466    /// Terminate the child process.
467    ///
468    /// This currently uses the same hard-kill path as [`Self::kill`].
469    pub fn terminate(&self) -> Result<(), ProcessError> {
470        self.kill()
471    }
472
473    /// Send the OS-appropriate soft termination signal to the child's
474    /// process group (POSIX: SIGTERM to `-pid`; Windows: no soft path
475    /// implemented yet — returns Ok without doing anything so callers
476    /// can run the same code on both platforms and rely on the post-
477    /// grace hard kill).
478    ///
479    /// Requires `ProcessConfig.create_process_group=true` on POSIX so
480    /// that `-pid` resolves to the child's own group. With the default
481    /// `create_process_group=false`, the kill would walk back to the
482    /// caller's group; the method silently no-ops in that case to avoid
483    /// signaling the wrong tree.
484    ///
485    /// Used by the daemon-side pipe sessions (#130 M4 follow-up) so
486    /// that `TerminationOutcome::SoftExit` becomes meaningful on POSIX.
487    pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
488        #[cfg(unix)]
489        {
490            if !self.config.create_process_group {
491                return Ok(());
492            }
493            let pid = match self.pid() {
494                Some(p) => p as i32,
495                None => return Err(ProcessError::NotRunning),
496            };
497            let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
498            if result != 0 {
499                let err = std::io::Error::last_os_error();
500                if err.raw_os_error() != Some(libc::ESRCH) {
501                    return Err(ProcessError::Io(err));
502                }
503            }
504            Ok(())
505        }
506        #[cfg(windows)]
507        {
508            if !self.config.create_process_group {
509                // GenerateConsoleCtrlEvent only routes to children
510                // spawned with CREATE_NEW_PROCESS_GROUP, and the
511                // event would otherwise hit the daemon's own group.
512                // No-op so the hard-kill schedule still wins.
513                return Ok(());
514            }
515            let pid = match self.pid() {
516                Some(p) => p,
517                None => return Err(ProcessError::NotRunning),
518            };
519            // GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT=1, pid).
520            // SAFETY: the FFI call is the standard Windows API; no
521            // borrowed Rust state is involved.
522            let ok = unsafe {
523                winapi::um::wincon::GenerateConsoleCtrlEvent(
524                    winapi::um::wincon::CTRL_BREAK_EVENT,
525                    pid,
526                )
527            };
528            if ok == 0 {
529                let err = std::io::Error::last_os_error();
530                // ERROR_INVALID_HANDLE means the child has already
531                // exited or has detached from the console — treat as
532                // success because the soft step's only goal is to
533                // give the child a chance to exit cleanly, and a
534                // dead/detached child does not need one.
535                if err.raw_os_error() != Some(6) {
536                    return Err(ProcessError::Io(err));
537                }
538            }
539            Ok(())
540        }
541    }
542
543    // Preserve a stable Rust frame here in release user dumps.
544    #[inline(never)]
545    /// Close the process wrapper by terminating the child when it is running.
546    pub fn close(&self) -> Result<(), ProcessError> {
547        public_symbols::rp_native_process_close_public(self)
548    }
549
550    fn close_impl(&self) -> Result<(), ProcessError> {
551        crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
552        if self.child.lock().expect("child mutex poisoned").is_none() {
553            return Ok(());
554        }
555        if self.poll()?.is_none() {
556            self.kill()?;
557        } else {
558            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
559        }
560        Ok(())
561    }
562
563    /// Return the child process id when the wrapper currently owns a child.
564    pub fn pid(&self) -> Option<u32> {
565        self.child
566            .lock()
567            .expect("child mutex poisoned")
568            .as_ref()
569            .map(|state| state.child.id())
570    }
571
572    /// Return the cached exit code when the child has exited.
573    pub fn returncode(&self) -> Option<i32> {
574        let v = self.shared.returncode.load(Ordering::Acquire);
575        if v == RETURNCODE_NOT_SET {
576            None
577        } else {
578            Some(v as i32)
579        }
580    }
581
582    /// Return whether captured output is queued for one stream.
583    pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
584        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
585            return false;
586        }
587        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
588        match stream {
589            StreamKind::Stdout => !guard.stdout_queue.is_empty(),
590            StreamKind::Stderr => !guard.stderr_queue.is_empty(),
591        }
592    }
593
594    /// Return whether captured combined output is queued.
595    pub fn has_pending_combined(&self) -> bool {
596        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
597        !guard.combined_queue.is_empty()
598    }
599
600    /// Drain and return all queued output for one stream.
601    pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
602        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
603            return Vec::new();
604        }
605        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
606        let queue = match stream {
607            StreamKind::Stdout => &mut guard.stdout_queue,
608            StreamKind::Stderr => &mut guard.stderr_queue,
609        };
610        queue.drain(..).collect()
611    }
612
613    /// Drain and return all queued combined output events.
614    pub fn drain_combined(&self) -> Vec<StreamEvent> {
615        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
616        guard.combined_queue.drain(..).collect()
617    }
618
619    /// Read the next captured chunk from one stream.
620    ///
621    /// Returns [`ReadStatus::Timeout`] when `timeout` elapses before output or
622    /// EOF is observed.
623    pub fn read_stream(
624        &self,
625        stream: StreamKind,
626        timeout: Option<Duration>,
627    ) -> ReadStatus<Vec<u8>> {
628        let deadline = timeout.map(|limit| Instant::now() + limit);
629        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
630
631        loop {
632            if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
633                return ReadStatus::Eof;
634            }
635
636            let queue = match stream {
637                StreamKind::Stdout => &mut guard.stdout_queue,
638                StreamKind::Stderr => &mut guard.stderr_queue,
639            };
640            if let Some(line) = queue.pop_front() {
641                return ReadStatus::Line(line);
642            }
643
644            let closed = match stream {
645                StreamKind::Stdout => {
646                    if self.config.stderr_mode == StderrMode::Stdout {
647                        guard.stdout_closed && guard.stderr_closed
648                    } else {
649                        guard.stdout_closed
650                    }
651                }
652                StreamKind::Stderr => guard.stderr_closed,
653            };
654            if closed {
655                return ReadStatus::Eof;
656            }
657
658            match deadline {
659                Some(deadline) => {
660                    let now = Instant::now();
661                    if now >= deadline {
662                        return ReadStatus::Timeout;
663                    }
664                    let wait = deadline.saturating_duration_since(now);
665                    let result = self
666                        .shared
667                        .condvar
668                        .wait_timeout(guard, wait)
669                        .expect("queue mutex poisoned");
670                    guard = result.0;
671                    if result.1.timed_out() {
672                        return ReadStatus::Timeout;
673                    }
674                }
675                None => {
676                    guard = self
677                        .shared
678                        .condvar
679                        .wait(guard)
680                        .expect("queue mutex poisoned");
681                }
682            }
683        }
684    }
685
686    // Preserve a stable Rust frame here in release user dumps.
687    #[inline(never)]
688    /// Read the next captured combined stream event.
689    pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
690        public_symbols::rp_native_process_read_combined_public(self, timeout)
691    }
692
693    fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
694        crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
695        let deadline = timeout.map(|limit| Instant::now() + limit);
696        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
697
698        loop {
699            if let Some(event) = guard.combined_queue.pop_front() {
700                return ReadStatus::Line(event);
701            }
702            if guard.stdout_closed && guard.stderr_closed {
703                return ReadStatus::Eof;
704            }
705
706            match deadline {
707                Some(deadline) => {
708                    let now = Instant::now();
709                    if now >= deadline {
710                        return ReadStatus::Timeout;
711                    }
712                    let wait = deadline.saturating_duration_since(now);
713                    let result = self
714                        .shared
715                        .condvar
716                        .wait_timeout(guard, wait)
717                        .expect("queue mutex poisoned");
718                    guard = result.0;
719                    if result.1.timed_out() {
720                        return ReadStatus::Timeout;
721                    }
722                }
723                None => {
724                    guard = self
725                        .shared
726                        .condvar
727                        .wait(guard)
728                        .expect("queue mutex poisoned");
729                }
730            }
731        }
732    }
733
734    /// Return the retained stdout history.
735    pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
736        self.shared
737            .queues
738            .lock()
739            .expect("queue mutex poisoned")
740            .stdout_history
741            .clone()
742            .into_iter()
743            .collect()
744    }
745
746    fn captured_stdout_raw(&self) -> Vec<u8> {
747        self.shared
748            .queues
749            .lock()
750            .expect("queue mutex poisoned")
751            .stdout_raw
752            .clone()
753    }
754
755    /// Return the retained stderr history.
756    pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
757        if self.config.stderr_mode == StderrMode::Stdout {
758            return Vec::new();
759        }
760        self.shared
761            .queues
762            .lock()
763            .expect("queue mutex poisoned")
764            .stderr_history
765            .clone()
766            .into_iter()
767            .collect()
768    }
769
770    fn captured_stderr_raw(&self) -> Vec<u8> {
771        if self.config.stderr_mode == StderrMode::Stdout {
772            return Vec::new();
773        }
774        self.shared
775            .queues
776            .lock()
777            .expect("queue mutex poisoned")
778            .stderr_raw
779            .clone()
780    }
781
782    /// Return the retained combined stdout/stderr event history.
783    pub fn captured_combined(&self) -> Vec<StreamEvent> {
784        self.shared
785            .queues
786            .lock()
787            .expect("queue mutex poisoned")
788            .combined_history
789            .clone()
790            .into_iter()
791            .collect()
792    }
793
794    /// Return the retained byte count for one captured stream.
795    pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
796        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
797            return 0;
798        }
799        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
800        match stream {
801            StreamKind::Stdout => guard.stdout_history_bytes,
802            StreamKind::Stderr => guard.stderr_history_bytes,
803        }
804    }
805
806    /// Return the retained byte count for combined captured output.
807    pub fn captured_combined_bytes(&self) -> usize {
808        self.shared
809            .queues
810            .lock()
811            .expect("queue mutex poisoned")
812            .combined_history_bytes
813    }
814
815    /// Clear retained output history for one stream and return freed bytes.
816    pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
817        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
818            return 0;
819        }
820        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
821        match stream {
822            StreamKind::Stdout => {
823                let released = guard.stdout_history_bytes;
824                guard.stdout_history.clear();
825                guard.stdout_raw.clear();
826                guard.stdout_history_bytes = 0;
827                released
828            }
829            StreamKind::Stderr => {
830                let released = guard.stderr_history_bytes;
831                guard.stderr_history.clear();
832                guard.stderr_raw.clear();
833                guard.stderr_history_bytes = 0;
834                released
835            }
836        }
837    }
838
839    /// Clear retained combined output history and return freed bytes.
840    pub fn clear_captured_combined(&self) -> usize {
841        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
842        let released = guard.combined_history_bytes;
843        guard.combined_history.clear();
844        guard.combined_history_bytes = 0;
845        released
846    }
847
848    fn build_command(&self) -> Command {
849        let mut command = match &self.config.command {
850            CommandSpec::Shell(command) => shell_command(command),
851            CommandSpec::Argv(argv) => {
852                let mut command = Command::new(&argv[0]);
853                if argv.len() > 1 {
854                    command.args(&argv[1..]);
855                }
856                command
857            }
858        };
859        if let Some(cwd) = &self.config.cwd {
860            command.current_dir(cwd);
861        }
862        if let Some(env) = &self.config.env {
863            command.env_clear();
864            command.envs(env.iter().map(|(k, v)| (k, v)));
865        }
866        #[cfg(windows)]
867        {
868            use std::os::windows::process::CommandExt;
869
870            // CREATE_NEW_PROCESS_GROUP makes GenerateConsoleCtrlEvent
871            // with CTRL_BREAK_EVENT route to this child's group
872            // (rather than the daemon's group) — required for the
873            // pipe-session soft-signal path on Windows (#130 M4).
874            const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
875            let extra = if self.config.create_process_group {
876                CREATE_NEW_PROCESS_GROUP
877            } else {
878                0
879            };
880            let flags = self.config.creationflags.unwrap_or(0)
881                | extra
882                | windows_priority_flags(self.config.nice);
883            if flags != 0 {
884                command.creation_flags(flags);
885            }
886        }
887        #[cfg(unix)]
888        {
889            let create_process_group = self.config.create_process_group;
890            let nice = self.config.nice;
891
892            if create_process_group || nice.is_some() {
893                use std::os::unix::process::CommandExt;
894
895                unsafe {
896                    command.pre_exec(move || {
897                        if create_process_group && libc::setpgid(0, 0) == -1 {
898                            return Err(std::io::Error::last_os_error());
899                        }
900                        if let Some(nice) = nice {
901                            let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
902                            if result == -1 {
903                                return Err(std::io::Error::last_os_error());
904                            }
905                        }
906                        Ok(())
907                    });
908                }
909            }
910        }
911        command
912    }
913
914    fn spawn_reader<R>(
915        &self,
916        pipe: R,
917        source_stream: StreamKind,
918        visible_stream: StreamKind,
919        on_pipe_done: Box<dyn FnOnce() + Send>,
920    ) where
921        R: Read + Send + 'static,
922    {
923        let shared = Arc::clone(&self.shared);
924        thread::spawn(move || {
925            let mut reader = pipe;
926            let mut chunk = vec![0_u8; 65536];
927            let mut pending = Vec::new();
928
929            loop {
930                match reader.read(&mut chunk) {
931                    Ok(0) => break,
932                    Ok(n) => {
933                        append_raw(&shared, visible_stream, &chunk[..n]);
934                        let lines = feed_chunk(&mut pending, &chunk[..n]);
935                        emit_lines(&shared, visible_stream, lines);
936                    }
937                    Err(_) => break,
938                }
939            }
940
941            if !pending.is_empty() {
942                emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
943            }
944
945            // Clear the parent-side pipe-handle slot under its mutex
946            // before dropping the reader. After this returns,
947            // `kill_impl` can no longer try to `CancelIoEx` on us, so
948            // it's safe for `reader`'s drop to close the HANDLE.
949            on_pipe_done();
950            drop(reader);
951
952            let mut guard = shared.queues.lock().expect("queue mutex poisoned");
953            match source_stream {
954                StreamKind::Stdout => guard.stdout_closed = true,
955                StreamKind::Stderr => guard.stderr_closed = true,
956            }
957            shared.condvar.notify_all();
958        });
959    }
960
961    #[cfg(windows)]
962    fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
963        let handles = Arc::clone(&self.capture_pipe_handles);
964        Box::new(move || {
965            let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
966            match stream {
967                StreamKind::Stdout => guard.stdout = None,
968                StreamKind::Stderr => guard.stderr = None,
969            }
970        })
971    }
972
973    #[cfg(not(windows))]
974    fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
975        Box::new(|| {})
976    }
977
978    /// Cancel any pending blocking `read()` on the parent-side capture
979    /// pipes so the reader threads' `read()` calls return
980    /// `ERROR_OPERATION_ABORTED` immediately. Used by `kill_impl` to
981    /// break the grandchild-orphan deadlock without waiting on
982    /// `wait_for_capture_completion_with_deadline`'s safety-net.
983    #[cfg(windows)]
984    fn cancel_capture_io(&self) {
985        crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
986        use winapi::shared::ntdef::HANDLE;
987        use winapi::um::ioapiset::CancelIoEx;
988        let guard = self
989            .capture_pipe_handles
990            .lock()
991            .expect("capture pipe handles mutex poisoned");
992        if let Some(h) = guard.stdout {
993            // SAFETY: the slot is `Some` only while the owning reader
994            // thread still holds the `ChildStdout`, so the HANDLE is
995            // valid for the duration of this call. The reader is
996            // blocked in `lock()` on the same mutex if it's racing us
997            // toward exit, so it cannot drop the pipe and close the
998            // HANDLE until we return.
999            unsafe {
1000                CancelIoEx(h as HANDLE, std::ptr::null_mut());
1001            }
1002        }
1003        if let Some(h) = guard.stderr {
1004            unsafe {
1005                CancelIoEx(h as HANDLE, std::ptr::null_mut());
1006            }
1007        }
1008    }
1009
1010    fn set_returncode(&self, code: i32) {
1011        self.shared.returncode.store(code as i64, Ordering::Release);
1012        self.shared.condvar.notify_all();
1013    }
1014
1015    fn wait_for_capture_completion_impl(&self) {
1016        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
1017        if !self.config.capture {
1018            return;
1019        }
1020
1021        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1022        while !(guard.stdout_closed && guard.stderr_closed) {
1023            guard = self
1024                .shared
1025                .condvar
1026                .wait(guard)
1027                .expect("queue mutex poisoned");
1028        }
1029    }
1030
1031    /// Like `wait_for_capture_completion_impl` but bounded by `deadline`.
1032    /// Returns `true` if the reader threads flipped both closed flags on
1033    /// their own, `false` if the deadline elapsed first. On timeout the
1034    /// closed flags are force-set (and waiters notified) so downstream
1035    /// pollers stop seeing `Timeout` and start seeing `Eof`. A reader
1036    /// thread that eventually unblocks after the OS releases the pipe
1037    /// will assign `closed = true` again, which is a harmless no-op.
1038    fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
1039        crate::rp_rust_debug_scope!(
1040            "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
1041        );
1042        if !self.config.capture {
1043            return true;
1044        }
1045
1046        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1047        while !(guard.stdout_closed && guard.stderr_closed) {
1048            let now = Instant::now();
1049            if now >= deadline {
1050                guard.stdout_closed = true;
1051                guard.stderr_closed = true;
1052                self.shared.condvar.notify_all();
1053                return false;
1054            }
1055            let (next_guard, result) = self
1056                .shared
1057                .condvar
1058                .wait_timeout(guard, deadline - now)
1059                .expect("queue mutex poisoned");
1060            guard = next_guard;
1061            if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
1062                guard.stdout_closed = true;
1063                guard.stderr_closed = true;
1064                self.shared.condvar.notify_all();
1065                return false;
1066            }
1067        }
1068        true
1069    }
1070}
1071
1072fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
1073    if lines.is_empty() {
1074        return;
1075    }
1076    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1077    for line in lines {
1078        let line_len = line.len();
1079        match stream {
1080            StreamKind::Stdout => {
1081                guard.stdout_history_bytes += line_len;
1082                guard.stdout_history.push_back(line.clone());
1083                guard.stdout_queue.push_back(line.clone());
1084            }
1085            StreamKind::Stderr => {
1086                guard.stderr_history_bytes += line_len;
1087                guard.stderr_history.push_back(line.clone());
1088                guard.stderr_queue.push_back(line.clone());
1089            }
1090        }
1091        let event = StreamEvent { stream, line };
1092        guard.combined_history_bytes += line_len;
1093        guard.combined_history.push_back(event.clone());
1094        guard.combined_queue.push_back(event);
1095    }
1096    shared.condvar.notify_all();
1097}
1098
1099fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1100    if chunk.is_empty() {
1101        return;
1102    }
1103    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1104    match stream {
1105        StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1106        StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1107    }
1108}
1109
1110/// Run a command to completion while concurrently draining stdout and stderr.
1111///
1112/// The helper forces capture on regardless of `config.capture`, returns raw
1113/// stdout/stderr bytes, and kills the child before returning
1114/// [`ProcessError::Timeout`] when `timeout` elapses.
1115pub fn run_command(
1116    mut config: ProcessConfig,
1117    timeout: Option<Duration>,
1118) -> Result<RunOutput, ProcessError> {
1119    config.capture = true;
1120    let process = NativeProcess::new(config);
1121    process.start()?;
1122
1123    let exit_code = match process.wait(timeout) {
1124        Ok(code) => code,
1125        Err(ProcessError::Timeout) => {
1126            match process.kill() {
1127                Ok(()) | Err(ProcessError::NotRunning) => {}
1128                Err(error) => return Err(error),
1129            }
1130            return Err(ProcessError::Timeout);
1131        }
1132        Err(error) => return Err(error),
1133    };
1134
1135    Ok(RunOutput {
1136        stdout: process.captured_stdout_raw(),
1137        stderr: process.captured_stderr_raw(),
1138        exit_code,
1139    })
1140}
1141
1142pub(crate) fn shell_command(command: &str) -> Command {
1143    #[cfg(windows)]
1144    {
1145        use std::os::windows::process::CommandExt;
1146
1147        let mut cmd = Command::new("cmd");
1148        cmd.raw_arg("/D /S /C \"");
1149        cmd.raw_arg(command);
1150        cmd.raw_arg("\"");
1151        cmd
1152    }
1153    #[cfg(not(windows))]
1154    {
1155        let mut cmd = Command::new("sh");
1156        cmd.arg("-lc").arg(command);
1157        cmd
1158    }
1159}
1160
1161#[cfg(test)]
1162mod tests;