Skip to main content

running_process/
lib.rs

1use std::collections::VecDeque;
2use std::io::Read;
3use std::process::{Child, Command, Stdio};
4use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9pub mod console_detect;
10pub mod containment;
11mod helpers;
12#[cfg(feature = "originator-scan")]
13pub mod originator;
14// Wave 3+4 of #165: proto module + IPC client absorbed from the
15// former `running-process-proto` and `running-process-client` crates.
16// Both gated behind `feature = "client"`. The protobuf package
17// `running_process.daemon.v1` compiles to the file referenced below.
18#[cfg(feature = "client")]
19pub mod proto {
20    pub mod daemon {
21        include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
22    }
23}
24
25#[cfg(feature = "client")]
26pub mod client;
27
28// Lightweight tee sink primitives for callers that want transcript/log
29// fan-out without pulling in the full daemon runtime.
30#[cfg(feature = "telemetry")]
31#[path = "daemon/telemetry.rs"]
32pub mod telemetry;
33
34// Wave 5 of #165: daemon runtime absorbed from `running-process-daemon`.
35// Heavy deps (tokio, sqlite, etc.) gated behind `feature = "daemon"`.
36#[cfg(feature = "daemon")]
37pub mod daemon;
38pub mod pty;
39mod public_symbols;
40mod rust_debug;
41pub mod spawn;
42pub mod terminal_graphics;
43mod types;
44#[cfg(unix)]
45mod unix;
46#[cfg(windows)]
47mod windows;
48
49pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
50pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
51#[cfg(feature = "originator-scan")]
52pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
53pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
54pub use spawn::{
55    spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
56    StdioSource,
57};
58pub use terminal_graphics::{
59    current_terminal_capabilities, current_terminal_capabilities_with_timeout,
60    detect_terminal_capabilities, CapabilityStatus, EvidenceStrength, GraphicsCapability,
61    GraphicsProtocol, TerminalCapabilities, TerminalCapabilityInput, TerminalGraphicsCapabilities,
62    TerminalProbeEvidence,
63};
64pub use types::{
65    CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
66    StreamEvent, StreamKind,
67};
68
69pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
70#[cfg(unix)]
71pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
72#[cfg(windows)]
73pub(crate) use windows::{
74    assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
75    WindowsJobHandle,
76};
77
78#[macro_export]
79macro_rules! rp_rust_debug_scope {
80    ($label:expr) => {
81        let _running_process_rust_debug_scope =
82            $crate::RustDebugScopeGuard::enter($label, file!(), line!());
83    };
84}
85
86#[derive(Default)]
87struct QueueState {
88    stdout_queue: VecDeque<Vec<u8>>,
89    stderr_queue: VecDeque<Vec<u8>>,
90    combined_queue: VecDeque<StreamEvent>,
91    stdout_history: VecDeque<Vec<u8>>,
92    stderr_history: VecDeque<Vec<u8>>,
93    combined_history: VecDeque<StreamEvent>,
94    stdout_raw: Vec<u8>,
95    stderr_raw: Vec<u8>,
96    stdout_history_bytes: usize,
97    stderr_history_bytes: usize,
98    combined_history_bytes: usize,
99    stdout_closed: bool,
100    stderr_closed: bool,
101}
102
103/// Sentinel value for returncode atomic: process has not exited yet.
104const RETURNCODE_NOT_SET: i64 = i64::MIN;
105
106struct SharedState {
107    queues: Mutex<QueueState>,
108    condvar: Condvar,
109    /// Atomic exit code. `RETURNCODE_NOT_SET` means "not exited yet".
110    /// Updated by a background waiter thread — reading is lock-free.
111    returncode: AtomicI64,
112}
113
114struct ChildState {
115    child: Child,
116    #[cfg(windows)]
117    _job: WindowsJobHandle,
118}
119
120impl SharedState {
121    fn new(capture: bool) -> Self {
122        let queues = QueueState {
123            stdout_closed: !capture,
124            stderr_closed: !capture,
125            ..QueueState::default()
126        };
127        Self {
128            queues: Mutex::new(queues),
129            condvar: Condvar::new(),
130            returncode: AtomicI64::new(RETURNCODE_NOT_SET),
131        }
132    }
133}
134
135pub struct NativeProcess {
136    config: ProcessConfig,
137    child: Arc<Mutex<Option<ChildState>>>,
138    shared: Arc<SharedState>,
139    #[cfg(windows)]
140    capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
141}
142
143impl NativeProcess {
144    pub fn new(config: ProcessConfig) -> Self {
145        Self {
146            shared: Arc::new(SharedState::new(config.capture)),
147            child: Arc::new(Mutex::new(None)),
148            config,
149            #[cfg(windows)]
150            capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
151        }
152    }
153
154    // Preserve a stable Rust frame here in release user dumps.
155    #[inline(never)]
156    pub fn start(&self) -> Result<(), ProcessError> {
157        public_symbols::rp_native_process_start_public(self)
158    }
159
160    fn start_impl(&self) -> Result<(), ProcessError> {
161        crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
162        let mut guard = self.child.lock().expect("child mutex poisoned");
163        if guard.is_some() {
164            return Err(ProcessError::AlreadyStarted);
165        }
166
167        let mut command = self.build_command();
168        match self.config.stdin_mode {
169            StdinMode::Inherit => {}
170            StdinMode::Piped => {
171                command.stdin(Stdio::piped());
172            }
173            StdinMode::Null => {
174                command.stdin(Stdio::null());
175            }
176        }
177        if self.config.capture {
178            command.stdout(Stdio::piped());
179            command.stderr(Stdio::piped());
180        }
181
182        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
183        log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
184        #[cfg(windows)]
185        let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
186            .map_err(ProcessError::Spawn)?;
187        if self.config.capture {
188            let stdout = child.stdout.take().expect("stdout pipe missing");
189            let stderr = child.stderr.take().expect("stderr pipe missing");
190            #[cfg(windows)]
191            {
192                use std::os::windows::io::AsRawHandle;
193                let mut handles = self
194                    .capture_pipe_handles
195                    .lock()
196                    .expect("capture pipe handles mutex poisoned");
197                handles.stdout = Some(stdout.as_raw_handle() as usize);
198                handles.stderr = Some(stderr.as_raw_handle() as usize);
199            }
200            self.spawn_reader(
201                stdout,
202                StreamKind::Stdout,
203                StreamKind::Stdout,
204                self.pipe_done_callback(StreamKind::Stdout),
205            );
206            self.spawn_reader(
207                stderr,
208                StreamKind::Stderr,
209                match self.config.stderr_mode {
210                    StderrMode::Stdout => StreamKind::Stdout,
211                    StderrMode::Pipe => StreamKind::Stderr,
212                },
213                self.pipe_done_callback(StreamKind::Stderr),
214            );
215        }
216        *guard = Some(ChildState {
217            child,
218            #[cfg(windows)]
219            _job: job,
220        });
221        drop(guard);
222        self.spawn_exit_waiter();
223        Ok(())
224    }
225
226    /// Background thread that polls for process exit and stores the exit code
227    /// atomically. This makes `returncode` auto-update without explicit `poll()`.
228    fn spawn_exit_waiter(&self) {
229        let child = Arc::clone(&self.child);
230        let shared = Arc::clone(&self.shared);
231        thread::spawn(move || loop {
232            if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
233                return;
234            }
235            {
236                let mut guard = child.lock().expect("child mutex poisoned");
237                if let Some(child_state) = guard.as_mut() {
238                    match child_state.child.try_wait() {
239                        Ok(Some(status)) => {
240                            let code = exit_code(status);
241                            shared.returncode.store(code as i64, Ordering::Release);
242                            shared.condvar.notify_all();
243                            return;
244                        }
245                        Ok(None) => {}
246                        Err(_) => return,
247                    }
248                } else {
249                    return;
250                }
251            }
252            // #199: intentional — capture thread polling for
253            // child-exit. `try_wait` is non-blocking by design;
254            // we can't block here because the thread also drains
255            // pipe state alongside the exit check. 10ms keeps the
256            // CPU cost negligible while staying responsive.
257            thread::sleep(Duration::from_millis(10));
258        });
259    }
260
261    pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
262        let mut guard = self.child.lock().expect("child mutex poisoned");
263        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
264        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
265        use std::io::Write;
266        stdin.write_all(data).map_err(ProcessError::Io)?;
267        stdin.flush().map_err(ProcessError::Io)?;
268        drop(child.stdin.take());
269        Ok(())
270    }
271
272    /// Write to the child's stdin without closing it afterwards, so the
273    /// caller can issue additional writes. Used by interactive
274    /// pipe-backed sessions (#130 milestone 3) where the daemon keeps
275    /// stdin open across multiple client input frames.
276    pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
277        let mut guard = self.child.lock().expect("child mutex poisoned");
278        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
279        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
280        use std::io::Write;
281        stdin.write_all(data).map_err(ProcessError::Io)?;
282        stdin.flush().map_err(ProcessError::Io)?;
283        Ok(())
284    }
285
286    /// Explicitly close the child's stdin (signals EOF to the child).
287    /// Idempotent: returns Ok if stdin was already closed.
288    pub fn close_stdin(&self) -> Result<(), ProcessError> {
289        let mut guard = self.child.lock().expect("child mutex poisoned");
290        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
291        drop(child.stdin.take());
292        Ok(())
293    }
294
295    pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
296        // Fast path: check atomic set by background waiter thread.
297        if let Some(code) = self.returncode() {
298            return Ok(Some(code));
299        }
300        let mut guard = self.child.lock().expect("child mutex poisoned");
301        let Some(child_state) = guard.as_mut() else {
302            return Ok(self.returncode());
303        };
304        let child = &mut child_state.child;
305        let status = child.try_wait().map_err(ProcessError::Io)?;
306        if let Some(status) = status {
307            let code = exit_code(status);
308            self.set_returncode(code);
309            return Ok(Some(code));
310        }
311        Ok(None)
312    }
313
314    // Preserve a stable Rust frame here in release user dumps.
315    #[inline(never)]
316    pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
317        public_symbols::rp_native_process_wait_public(self, timeout)
318    }
319
320    fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
321        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
322        if self.child.lock().expect("child mutex poisoned").is_none() {
323            return self.returncode().ok_or(ProcessError::NotRunning);
324        }
325        // Fast path: already exited.
326        if let Some(code) = self.returncode() {
327            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
328            return Ok(code);
329        }
330        let start = Instant::now();
331        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
332        loop {
333            // Check returncode (set by exit-waiter thread via atomic + condvar).
334            let rc = self.shared.returncode.load(Ordering::Acquire);
335            if rc != RETURNCODE_NOT_SET {
336                drop(guard);
337                let code = rc as i32;
338                public_symbols::rp_native_process_wait_for_capture_completion_public(self);
339                return Ok(code);
340            }
341            if let Some(limit) = timeout {
342                let elapsed = start.elapsed();
343                if elapsed >= limit {
344                    return Err(ProcessError::Timeout);
345                }
346                let remaining = limit - elapsed;
347                // Wait on condvar with timeout, capped at 50ms to recheck.
348                let wait_time = remaining.min(Duration::from_millis(50));
349                guard = self
350                    .shared
351                    .condvar
352                    .wait_timeout(guard, wait_time)
353                    .expect("queue mutex poisoned")
354                    .0;
355            } else {
356                // Wait on condvar with periodic recheck.
357                guard = self
358                    .shared
359                    .condvar
360                    .wait_timeout(guard, Duration::from_millis(50))
361                    .expect("queue mutex poisoned")
362                    .0;
363            }
364        }
365    }
366
367    // Preserve a stable Rust frame here in release user dumps.
368    #[inline(never)]
369    pub fn kill(&self) -> Result<(), ProcessError> {
370        public_symbols::rp_native_process_kill_public(self)
371    }
372
373    fn kill_impl(&self) -> Result<(), ProcessError> {
374        crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
375        {
376            let mut guard = self.child.lock().expect("child mutex poisoned");
377            let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
378            child.kill().map_err(ProcessError::Io)?;
379            let status = child.wait().map_err(ProcessError::Io)?;
380            self.set_returncode(exit_code(status));
381        }
382        // On Windows, interrupt any pending blocking `read()` in the
383        // per-stream reader threads so they fall out of their loops
384        // immediately. This is what makes the grandchild-pipe-orphan
385        // case (FastLED Bug B: uv.exe spawns a python.exe grandchild
386        // that inherits the pipe and outlives uv) wake up in
387        // microseconds instead of waiting for the bounded-drain
388        // safety-net deadline below.
389        #[cfg(windows)]
390        self.cancel_capture_io();
391        // Synchronize with the per-stream reader threads so that by the
392        // time kill() returns, the capture queues have flipped from
393        // "blocked on read" to "closed" and downstream pollers (e.g.
394        // take_combined_line) observe EOS instead of timeout. Without
395        // this, callers that hit a wait()-timeout path see Python code
396        // raise TimeoutError, kill the child, then race the reader
397        // threads — a 10ms poll loop can miss the EOS flip entirely.
398        //
399        // The deadline is the safety-net: on Windows `CancelIoEx`
400        // above almost always wakes the readers first; on POSIX, and
401        // in any pathological Windows case where `CancelIoEx` doesn't
402        // fire, the deadline guarantees `kill()` still returns.
403        public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
404            self,
405            kill_drain_deadline(),
406        );
407        Ok(())
408    }
409
410    pub fn terminate(&self) -> Result<(), ProcessError> {
411        self.kill()
412    }
413
414    /// Send the OS-appropriate soft termination signal to the child's
415    /// process group (POSIX: SIGTERM to `-pid`; Windows: no soft path
416    /// implemented yet — returns Ok without doing anything so callers
417    /// can run the same code on both platforms and rely on the post-
418    /// grace hard kill).
419    ///
420    /// Requires `ProcessConfig.create_process_group=true` on POSIX so
421    /// that `-pid` resolves to the child's own group. With the default
422    /// `create_process_group=false`, the kill would walk back to the
423    /// caller's group; the method silently no-ops in that case to avoid
424    /// signaling the wrong tree.
425    ///
426    /// Used by the daemon-side pipe sessions (#130 M4 follow-up) so
427    /// that `TerminationOutcome::SoftExit` becomes meaningful on POSIX.
428    pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
429        #[cfg(unix)]
430        {
431            if !self.config.create_process_group {
432                return Ok(());
433            }
434            let pid = match self.pid() {
435                Some(p) => p as i32,
436                None => return Err(ProcessError::NotRunning),
437            };
438            let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
439            if result != 0 {
440                let err = std::io::Error::last_os_error();
441                if err.raw_os_error() != Some(libc::ESRCH) {
442                    return Err(ProcessError::Io(err));
443                }
444            }
445            Ok(())
446        }
447        #[cfg(windows)]
448        {
449            if !self.config.create_process_group {
450                // GenerateConsoleCtrlEvent only routes to children
451                // spawned with CREATE_NEW_PROCESS_GROUP, and the
452                // event would otherwise hit the daemon's own group.
453                // No-op so the hard-kill schedule still wins.
454                return Ok(());
455            }
456            let pid = match self.pid() {
457                Some(p) => p,
458                None => return Err(ProcessError::NotRunning),
459            };
460            // GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT=1, pid).
461            // SAFETY: the FFI call is the standard Windows API; no
462            // borrowed Rust state is involved.
463            let ok = unsafe {
464                winapi::um::wincon::GenerateConsoleCtrlEvent(
465                    winapi::um::wincon::CTRL_BREAK_EVENT,
466                    pid,
467                )
468            };
469            if ok == 0 {
470                let err = std::io::Error::last_os_error();
471                // ERROR_INVALID_HANDLE means the child has already
472                // exited or has detached from the console — treat as
473                // success because the soft step's only goal is to
474                // give the child a chance to exit cleanly, and a
475                // dead/detached child does not need one.
476                if err.raw_os_error() != Some(6) {
477                    return Err(ProcessError::Io(err));
478                }
479            }
480            Ok(())
481        }
482    }
483
484    // Preserve a stable Rust frame here in release user dumps.
485    #[inline(never)]
486    pub fn close(&self) -> Result<(), ProcessError> {
487        public_symbols::rp_native_process_close_public(self)
488    }
489
490    fn close_impl(&self) -> Result<(), ProcessError> {
491        crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
492        if self.child.lock().expect("child mutex poisoned").is_none() {
493            return Ok(());
494        }
495        if self.poll()?.is_none() {
496            self.kill()?;
497        } else {
498            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
499        }
500        Ok(())
501    }
502
503    pub fn pid(&self) -> Option<u32> {
504        self.child
505            .lock()
506            .expect("child mutex poisoned")
507            .as_ref()
508            .map(|state| state.child.id())
509    }
510
511    pub fn returncode(&self) -> Option<i32> {
512        let v = self.shared.returncode.load(Ordering::Acquire);
513        if v == RETURNCODE_NOT_SET {
514            None
515        } else {
516            Some(v as i32)
517        }
518    }
519
520    pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
521        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
522            return false;
523        }
524        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
525        match stream {
526            StreamKind::Stdout => !guard.stdout_queue.is_empty(),
527            StreamKind::Stderr => !guard.stderr_queue.is_empty(),
528        }
529    }
530
531    pub fn has_pending_combined(&self) -> bool {
532        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
533        !guard.combined_queue.is_empty()
534    }
535
536    pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
537        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
538            return Vec::new();
539        }
540        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
541        let queue = match stream {
542            StreamKind::Stdout => &mut guard.stdout_queue,
543            StreamKind::Stderr => &mut guard.stderr_queue,
544        };
545        queue.drain(..).collect()
546    }
547
548    pub fn drain_combined(&self) -> Vec<StreamEvent> {
549        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
550        guard.combined_queue.drain(..).collect()
551    }
552
553    pub fn read_stream(
554        &self,
555        stream: StreamKind,
556        timeout: Option<Duration>,
557    ) -> ReadStatus<Vec<u8>> {
558        let deadline = timeout.map(|limit| Instant::now() + limit);
559        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
560
561        loop {
562            if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
563                return ReadStatus::Eof;
564            }
565
566            let queue = match stream {
567                StreamKind::Stdout => &mut guard.stdout_queue,
568                StreamKind::Stderr => &mut guard.stderr_queue,
569            };
570            if let Some(line) = queue.pop_front() {
571                return ReadStatus::Line(line);
572            }
573
574            let closed = match stream {
575                StreamKind::Stdout => {
576                    if self.config.stderr_mode == StderrMode::Stdout {
577                        guard.stdout_closed && guard.stderr_closed
578                    } else {
579                        guard.stdout_closed
580                    }
581                }
582                StreamKind::Stderr => guard.stderr_closed,
583            };
584            if closed {
585                return ReadStatus::Eof;
586            }
587
588            match deadline {
589                Some(deadline) => {
590                    let now = Instant::now();
591                    if now >= deadline {
592                        return ReadStatus::Timeout;
593                    }
594                    let wait = deadline.saturating_duration_since(now);
595                    let result = self
596                        .shared
597                        .condvar
598                        .wait_timeout(guard, wait)
599                        .expect("queue mutex poisoned");
600                    guard = result.0;
601                    if result.1.timed_out() {
602                        return ReadStatus::Timeout;
603                    }
604                }
605                None => {
606                    guard = self
607                        .shared
608                        .condvar
609                        .wait(guard)
610                        .expect("queue mutex poisoned");
611                }
612            }
613        }
614    }
615
616    // Preserve a stable Rust frame here in release user dumps.
617    #[inline(never)]
618    pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
619        public_symbols::rp_native_process_read_combined_public(self, timeout)
620    }
621
622    fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
623        crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
624        let deadline = timeout.map(|limit| Instant::now() + limit);
625        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
626
627        loop {
628            if let Some(event) = guard.combined_queue.pop_front() {
629                return ReadStatus::Line(event);
630            }
631            if guard.stdout_closed && guard.stderr_closed {
632                return ReadStatus::Eof;
633            }
634
635            match deadline {
636                Some(deadline) => {
637                    let now = Instant::now();
638                    if now >= deadline {
639                        return ReadStatus::Timeout;
640                    }
641                    let wait = deadline.saturating_duration_since(now);
642                    let result = self
643                        .shared
644                        .condvar
645                        .wait_timeout(guard, wait)
646                        .expect("queue mutex poisoned");
647                    guard = result.0;
648                    if result.1.timed_out() {
649                        return ReadStatus::Timeout;
650                    }
651                }
652                None => {
653                    guard = self
654                        .shared
655                        .condvar
656                        .wait(guard)
657                        .expect("queue mutex poisoned");
658                }
659            }
660        }
661    }
662
663    pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
664        self.shared
665            .queues
666            .lock()
667            .expect("queue mutex poisoned")
668            .stdout_history
669            .clone()
670            .into_iter()
671            .collect()
672    }
673
674    fn captured_stdout_raw(&self) -> Vec<u8> {
675        self.shared
676            .queues
677            .lock()
678            .expect("queue mutex poisoned")
679            .stdout_raw
680            .clone()
681    }
682
683    pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
684        if self.config.stderr_mode == StderrMode::Stdout {
685            return Vec::new();
686        }
687        self.shared
688            .queues
689            .lock()
690            .expect("queue mutex poisoned")
691            .stderr_history
692            .clone()
693            .into_iter()
694            .collect()
695    }
696
697    fn captured_stderr_raw(&self) -> Vec<u8> {
698        if self.config.stderr_mode == StderrMode::Stdout {
699            return Vec::new();
700        }
701        self.shared
702            .queues
703            .lock()
704            .expect("queue mutex poisoned")
705            .stderr_raw
706            .clone()
707    }
708
709    pub fn captured_combined(&self) -> Vec<StreamEvent> {
710        self.shared
711            .queues
712            .lock()
713            .expect("queue mutex poisoned")
714            .combined_history
715            .clone()
716            .into_iter()
717            .collect()
718    }
719
720    pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
721        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
722            return 0;
723        }
724        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
725        match stream {
726            StreamKind::Stdout => guard.stdout_history_bytes,
727            StreamKind::Stderr => guard.stderr_history_bytes,
728        }
729    }
730
731    pub fn captured_combined_bytes(&self) -> usize {
732        self.shared
733            .queues
734            .lock()
735            .expect("queue mutex poisoned")
736            .combined_history_bytes
737    }
738
739    pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
740        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
741            return 0;
742        }
743        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
744        match stream {
745            StreamKind::Stdout => {
746                let released = guard.stdout_history_bytes;
747                guard.stdout_history.clear();
748                guard.stdout_raw.clear();
749                guard.stdout_history_bytes = 0;
750                released
751            }
752            StreamKind::Stderr => {
753                let released = guard.stderr_history_bytes;
754                guard.stderr_history.clear();
755                guard.stderr_raw.clear();
756                guard.stderr_history_bytes = 0;
757                released
758            }
759        }
760    }
761
762    pub fn clear_captured_combined(&self) -> usize {
763        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
764        let released = guard.combined_history_bytes;
765        guard.combined_history.clear();
766        guard.combined_history_bytes = 0;
767        released
768    }
769
770    fn build_command(&self) -> Command {
771        let mut command = match &self.config.command {
772            CommandSpec::Shell(command) => shell_command(command),
773            CommandSpec::Argv(argv) => {
774                let mut command = Command::new(&argv[0]);
775                if argv.len() > 1 {
776                    command.args(&argv[1..]);
777                }
778                command
779            }
780        };
781        if let Some(cwd) = &self.config.cwd {
782            command.current_dir(cwd);
783        }
784        if let Some(env) = &self.config.env {
785            command.env_clear();
786            command.envs(env.iter().map(|(k, v)| (k, v)));
787        }
788        #[cfg(windows)]
789        {
790            use std::os::windows::process::CommandExt;
791
792            // CREATE_NEW_PROCESS_GROUP makes GenerateConsoleCtrlEvent
793            // with CTRL_BREAK_EVENT route to this child's group
794            // (rather than the daemon's group) — required for the
795            // pipe-session soft-signal path on Windows (#130 M4).
796            const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
797            let extra = if self.config.create_process_group {
798                CREATE_NEW_PROCESS_GROUP
799            } else {
800                0
801            };
802            let flags = self.config.creationflags.unwrap_or(0)
803                | extra
804                | windows_priority_flags(self.config.nice);
805            if flags != 0 {
806                command.creation_flags(flags);
807            }
808        }
809        #[cfg(unix)]
810        {
811            let create_process_group = self.config.create_process_group;
812            let nice = self.config.nice;
813
814            if create_process_group || nice.is_some() {
815                use std::os::unix::process::CommandExt;
816
817                unsafe {
818                    command.pre_exec(move || {
819                        if create_process_group && libc::setpgid(0, 0) == -1 {
820                            return Err(std::io::Error::last_os_error());
821                        }
822                        if let Some(nice) = nice {
823                            let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
824                            if result == -1 {
825                                return Err(std::io::Error::last_os_error());
826                            }
827                        }
828                        Ok(())
829                    });
830                }
831            }
832        }
833        command
834    }
835
836    fn spawn_reader<R>(
837        &self,
838        pipe: R,
839        source_stream: StreamKind,
840        visible_stream: StreamKind,
841        on_pipe_done: Box<dyn FnOnce() + Send>,
842    ) where
843        R: Read + Send + 'static,
844    {
845        let shared = Arc::clone(&self.shared);
846        thread::spawn(move || {
847            let mut reader = pipe;
848            let mut chunk = vec![0_u8; 65536];
849            let mut pending = Vec::new();
850
851            loop {
852                match reader.read(&mut chunk) {
853                    Ok(0) => break,
854                    Ok(n) => {
855                        append_raw(&shared, visible_stream, &chunk[..n]);
856                        let lines = feed_chunk(&mut pending, &chunk[..n]);
857                        emit_lines(&shared, visible_stream, lines);
858                    }
859                    Err(_) => break,
860                }
861            }
862
863            if !pending.is_empty() {
864                emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
865            }
866
867            // Clear the parent-side pipe-handle slot under its mutex
868            // before dropping the reader. After this returns,
869            // `kill_impl` can no longer try to `CancelIoEx` on us, so
870            // it's safe for `reader`'s drop to close the HANDLE.
871            on_pipe_done();
872            drop(reader);
873
874            let mut guard = shared.queues.lock().expect("queue mutex poisoned");
875            match source_stream {
876                StreamKind::Stdout => guard.stdout_closed = true,
877                StreamKind::Stderr => guard.stderr_closed = true,
878            }
879            shared.condvar.notify_all();
880        });
881    }
882
883    #[cfg(windows)]
884    fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
885        let handles = Arc::clone(&self.capture_pipe_handles);
886        Box::new(move || {
887            let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
888            match stream {
889                StreamKind::Stdout => guard.stdout = None,
890                StreamKind::Stderr => guard.stderr = None,
891            }
892        })
893    }
894
895    #[cfg(not(windows))]
896    fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
897        Box::new(|| {})
898    }
899
900    /// Cancel any pending blocking `read()` on the parent-side capture
901    /// pipes so the reader threads' `read()` calls return
902    /// `ERROR_OPERATION_ABORTED` immediately. Used by `kill_impl` to
903    /// break the grandchild-orphan deadlock without waiting on
904    /// `wait_for_capture_completion_with_deadline`'s safety-net.
905    #[cfg(windows)]
906    fn cancel_capture_io(&self) {
907        crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
908        use winapi::shared::ntdef::HANDLE;
909        use winapi::um::ioapiset::CancelIoEx;
910        let guard = self
911            .capture_pipe_handles
912            .lock()
913            .expect("capture pipe handles mutex poisoned");
914        if let Some(h) = guard.stdout {
915            // SAFETY: the slot is `Some` only while the owning reader
916            // thread still holds the `ChildStdout`, so the HANDLE is
917            // valid for the duration of this call. The reader is
918            // blocked in `lock()` on the same mutex if it's racing us
919            // toward exit, so it cannot drop the pipe and close the
920            // HANDLE until we return.
921            unsafe {
922                CancelIoEx(h as HANDLE, std::ptr::null_mut());
923            }
924        }
925        if let Some(h) = guard.stderr {
926            unsafe {
927                CancelIoEx(h as HANDLE, std::ptr::null_mut());
928            }
929        }
930    }
931
932    fn set_returncode(&self, code: i32) {
933        self.shared.returncode.store(code as i64, Ordering::Release);
934        self.shared.condvar.notify_all();
935    }
936
937    fn wait_for_capture_completion_impl(&self) {
938        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
939        if !self.config.capture {
940            return;
941        }
942
943        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
944        while !(guard.stdout_closed && guard.stderr_closed) {
945            guard = self
946                .shared
947                .condvar
948                .wait(guard)
949                .expect("queue mutex poisoned");
950        }
951    }
952
953    /// Like `wait_for_capture_completion_impl` but bounded by `deadline`.
954    /// Returns `true` if the reader threads flipped both closed flags on
955    /// their own, `false` if the deadline elapsed first. On timeout the
956    /// closed flags are force-set (and waiters notified) so downstream
957    /// pollers stop seeing `Timeout` and start seeing `Eof`. A reader
958    /// thread that eventually unblocks after the OS releases the pipe
959    /// will assign `closed = true` again, which is a harmless no-op.
960    fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
961        crate::rp_rust_debug_scope!(
962            "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
963        );
964        if !self.config.capture {
965            return true;
966        }
967
968        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
969        while !(guard.stdout_closed && guard.stderr_closed) {
970            let now = Instant::now();
971            if now >= deadline {
972                guard.stdout_closed = true;
973                guard.stderr_closed = true;
974                self.shared.condvar.notify_all();
975                return false;
976            }
977            let (next_guard, result) = self
978                .shared
979                .condvar
980                .wait_timeout(guard, deadline - now)
981                .expect("queue mutex poisoned");
982            guard = next_guard;
983            if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
984                guard.stdout_closed = true;
985                guard.stderr_closed = true;
986                self.shared.condvar.notify_all();
987                return false;
988            }
989        }
990        true
991    }
992}
993
994fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
995    if lines.is_empty() {
996        return;
997    }
998    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
999    for line in lines {
1000        let line_len = line.len();
1001        match stream {
1002            StreamKind::Stdout => {
1003                guard.stdout_history_bytes += line_len;
1004                guard.stdout_history.push_back(line.clone());
1005                guard.stdout_queue.push_back(line.clone());
1006            }
1007            StreamKind::Stderr => {
1008                guard.stderr_history_bytes += line_len;
1009                guard.stderr_history.push_back(line.clone());
1010                guard.stderr_queue.push_back(line.clone());
1011            }
1012        }
1013        let event = StreamEvent { stream, line };
1014        guard.combined_history_bytes += line_len;
1015        guard.combined_history.push_back(event.clone());
1016        guard.combined_queue.push_back(event);
1017    }
1018    shared.condvar.notify_all();
1019}
1020
1021fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1022    if chunk.is_empty() {
1023        return;
1024    }
1025    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1026    match stream {
1027        StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1028        StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1029    }
1030}
1031
1032/// Run a command to completion while concurrently draining stdout and stderr.
1033///
1034/// The helper forces capture on regardless of `config.capture`, returns raw
1035/// stdout/stderr bytes, and kills the child before returning
1036/// [`ProcessError::Timeout`] when `timeout` elapses.
1037pub fn run_command(
1038    mut config: ProcessConfig,
1039    timeout: Option<Duration>,
1040) -> Result<RunOutput, ProcessError> {
1041    config.capture = true;
1042    let process = NativeProcess::new(config);
1043    process.start()?;
1044
1045    let exit_code = match process.wait(timeout) {
1046        Ok(code) => code,
1047        Err(ProcessError::Timeout) => {
1048            match process.kill() {
1049                Ok(()) | Err(ProcessError::NotRunning) => {}
1050                Err(error) => return Err(error),
1051            }
1052            return Err(ProcessError::Timeout);
1053        }
1054        Err(error) => return Err(error),
1055    };
1056
1057    Ok(RunOutput {
1058        stdout: process.captured_stdout_raw(),
1059        stderr: process.captured_stderr_raw(),
1060        exit_code,
1061    })
1062}
1063
1064pub(crate) fn shell_command(command: &str) -> Command {
1065    #[cfg(windows)]
1066    {
1067        use std::os::windows::process::CommandExt;
1068
1069        let mut cmd = Command::new("cmd");
1070        cmd.raw_arg("/D /S /C \"");
1071        cmd.raw_arg(command);
1072        cmd.raw_arg("\"");
1073        cmd
1074    }
1075    #[cfg(not(windows))]
1076    {
1077        let mut cmd = Command::new("sh");
1078        cmd.arg("-lc").arg(command);
1079        cmd
1080    }
1081}
1082
1083#[cfg(test)]
1084mod tests;