Skip to main content

running_process/
lib.rs

1use std::collections::VecDeque;
2use std::io::Read;
3use std::process::{Child, Command, Stdio};
4use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9pub mod console_detect;
10pub mod containment;
11mod helpers;
12#[cfg(feature = "originator-scan")]
13pub mod originator;
14// Wave 3+4 of #165: proto module + IPC client absorbed from the
15// former `running-process-proto` and `running-process-client` crates.
16// Both gated behind `feature = "client"`. The protobuf package
17// `running_process.daemon.v1` compiles to the file referenced below.
18#[cfg(feature = "client")]
19pub mod proto {
20    pub mod daemon {
21        include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
22    }
23}
24
25#[cfg(feature = "client")]
26pub mod client;
27
28// Lightweight tee sink primitives for callers that want transcript/log
29// fan-out without pulling in the full daemon runtime.
30#[cfg(feature = "telemetry")]
31#[path = "daemon/telemetry.rs"]
32pub mod telemetry;
33
34// Wave 5 of #165: daemon runtime absorbed from `running-process-daemon`.
35// Heavy deps (tokio, sqlite, etc.) gated behind `feature = "daemon"`.
36#[cfg(feature = "daemon")]
37pub mod daemon;
38pub mod pty;
39mod public_symbols;
40mod rust_debug;
41pub mod spawn;
42mod types;
43#[cfg(unix)]
44mod unix;
45#[cfg(windows)]
46mod windows;
47
48pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
49pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
50#[cfg(feature = "originator-scan")]
51pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
52pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
53pub use spawn::{
54    spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
55    StdioSource,
56};
57pub use types::{
58    CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
59    StreamEvent, StreamKind,
60};
61
62pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
63#[cfg(unix)]
64pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
65#[cfg(windows)]
66pub(crate) use windows::{
67    assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
68    WindowsJobHandle,
69};
70
71#[macro_export]
72macro_rules! rp_rust_debug_scope {
73    ($label:expr) => {
74        let _running_process_rust_debug_scope =
75            $crate::RustDebugScopeGuard::enter($label, file!(), line!());
76    };
77}
78
79#[derive(Default)]
80struct QueueState {
81    stdout_queue: VecDeque<Vec<u8>>,
82    stderr_queue: VecDeque<Vec<u8>>,
83    combined_queue: VecDeque<StreamEvent>,
84    stdout_history: VecDeque<Vec<u8>>,
85    stderr_history: VecDeque<Vec<u8>>,
86    combined_history: VecDeque<StreamEvent>,
87    stdout_raw: Vec<u8>,
88    stderr_raw: Vec<u8>,
89    stdout_history_bytes: usize,
90    stderr_history_bytes: usize,
91    combined_history_bytes: usize,
92    stdout_closed: bool,
93    stderr_closed: bool,
94}
95
96/// Sentinel value for returncode atomic: process has not exited yet.
97const RETURNCODE_NOT_SET: i64 = i64::MIN;
98
99struct SharedState {
100    queues: Mutex<QueueState>,
101    condvar: Condvar,
102    /// Atomic exit code. `RETURNCODE_NOT_SET` means "not exited yet".
103    /// Updated by a background waiter thread — reading is lock-free.
104    returncode: AtomicI64,
105}
106
107struct ChildState {
108    child: Child,
109    #[cfg(windows)]
110    _job: WindowsJobHandle,
111}
112
113impl SharedState {
114    fn new(capture: bool) -> Self {
115        let queues = QueueState {
116            stdout_closed: !capture,
117            stderr_closed: !capture,
118            ..QueueState::default()
119        };
120        Self {
121            queues: Mutex::new(queues),
122            condvar: Condvar::new(),
123            returncode: AtomicI64::new(RETURNCODE_NOT_SET),
124        }
125    }
126}
127
128pub struct NativeProcess {
129    config: ProcessConfig,
130    child: Arc<Mutex<Option<ChildState>>>,
131    shared: Arc<SharedState>,
132    #[cfg(windows)]
133    capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
134}
135
136impl NativeProcess {
137    pub fn new(config: ProcessConfig) -> Self {
138        Self {
139            shared: Arc::new(SharedState::new(config.capture)),
140            child: Arc::new(Mutex::new(None)),
141            config,
142            #[cfg(windows)]
143            capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
144        }
145    }
146
147    // Preserve a stable Rust frame here in release user dumps.
148    #[inline(never)]
149    pub fn start(&self) -> Result<(), ProcessError> {
150        public_symbols::rp_native_process_start_public(self)
151    }
152
153    fn start_impl(&self) -> Result<(), ProcessError> {
154        crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
155        let mut guard = self.child.lock().expect("child mutex poisoned");
156        if guard.is_some() {
157            return Err(ProcessError::AlreadyStarted);
158        }
159
160        let mut command = self.build_command();
161        match self.config.stdin_mode {
162            StdinMode::Inherit => {}
163            StdinMode::Piped => {
164                command.stdin(Stdio::piped());
165            }
166            StdinMode::Null => {
167                command.stdin(Stdio::null());
168            }
169        }
170        if self.config.capture {
171            command.stdout(Stdio::piped());
172            command.stderr(Stdio::piped());
173        }
174
175        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
176        log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
177        #[cfg(windows)]
178        let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
179            .map_err(ProcessError::Spawn)?;
180        if self.config.capture {
181            let stdout = child.stdout.take().expect("stdout pipe missing");
182            let stderr = child.stderr.take().expect("stderr pipe missing");
183            #[cfg(windows)]
184            {
185                use std::os::windows::io::AsRawHandle;
186                let mut handles = self
187                    .capture_pipe_handles
188                    .lock()
189                    .expect("capture pipe handles mutex poisoned");
190                handles.stdout = Some(stdout.as_raw_handle() as usize);
191                handles.stderr = Some(stderr.as_raw_handle() as usize);
192            }
193            self.spawn_reader(
194                stdout,
195                StreamKind::Stdout,
196                StreamKind::Stdout,
197                self.pipe_done_callback(StreamKind::Stdout),
198            );
199            self.spawn_reader(
200                stderr,
201                StreamKind::Stderr,
202                match self.config.stderr_mode {
203                    StderrMode::Stdout => StreamKind::Stdout,
204                    StderrMode::Pipe => StreamKind::Stderr,
205                },
206                self.pipe_done_callback(StreamKind::Stderr),
207            );
208        }
209        *guard = Some(ChildState {
210            child,
211            #[cfg(windows)]
212            _job: job,
213        });
214        drop(guard);
215        self.spawn_exit_waiter();
216        Ok(())
217    }
218
219    /// Background thread that polls for process exit and stores the exit code
220    /// atomically. This makes `returncode` auto-update without explicit `poll()`.
221    fn spawn_exit_waiter(&self) {
222        let child = Arc::clone(&self.child);
223        let shared = Arc::clone(&self.shared);
224        thread::spawn(move || loop {
225            if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
226                return;
227            }
228            {
229                let mut guard = child.lock().expect("child mutex poisoned");
230                if let Some(child_state) = guard.as_mut() {
231                    match child_state.child.try_wait() {
232                        Ok(Some(status)) => {
233                            let code = exit_code(status);
234                            shared.returncode.store(code as i64, Ordering::Release);
235                            shared.condvar.notify_all();
236                            return;
237                        }
238                        Ok(None) => {}
239                        Err(_) => return,
240                    }
241                } else {
242                    return;
243                }
244            }
245            // #199: intentional — capture thread polling for
246            // child-exit. `try_wait` is non-blocking by design;
247            // we can't block here because the thread also drains
248            // pipe state alongside the exit check. 10ms keeps the
249            // CPU cost negligible while staying responsive.
250            thread::sleep(Duration::from_millis(10));
251        });
252    }
253
254    pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
255        let mut guard = self.child.lock().expect("child mutex poisoned");
256        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
257        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
258        use std::io::Write;
259        stdin.write_all(data).map_err(ProcessError::Io)?;
260        stdin.flush().map_err(ProcessError::Io)?;
261        drop(child.stdin.take());
262        Ok(())
263    }
264
265    /// Write to the child's stdin without closing it afterwards, so the
266    /// caller can issue additional writes. Used by interactive
267    /// pipe-backed sessions (#130 milestone 3) where the daemon keeps
268    /// stdin open across multiple client input frames.
269    pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
270        let mut guard = self.child.lock().expect("child mutex poisoned");
271        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
272        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
273        use std::io::Write;
274        stdin.write_all(data).map_err(ProcessError::Io)?;
275        stdin.flush().map_err(ProcessError::Io)?;
276        Ok(())
277    }
278
279    /// Explicitly close the child's stdin (signals EOF to the child).
280    /// Idempotent: returns Ok if stdin was already closed.
281    pub fn close_stdin(&self) -> Result<(), ProcessError> {
282        let mut guard = self.child.lock().expect("child mutex poisoned");
283        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
284        drop(child.stdin.take());
285        Ok(())
286    }
287
288    pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
289        // Fast path: check atomic set by background waiter thread.
290        if let Some(code) = self.returncode() {
291            return Ok(Some(code));
292        }
293        let mut guard = self.child.lock().expect("child mutex poisoned");
294        let Some(child_state) = guard.as_mut() else {
295            return Ok(self.returncode());
296        };
297        let child = &mut child_state.child;
298        let status = child.try_wait().map_err(ProcessError::Io)?;
299        if let Some(status) = status {
300            let code = exit_code(status);
301            self.set_returncode(code);
302            return Ok(Some(code));
303        }
304        Ok(None)
305    }
306
307    // Preserve a stable Rust frame here in release user dumps.
308    #[inline(never)]
309    pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
310        public_symbols::rp_native_process_wait_public(self, timeout)
311    }
312
313    fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
314        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
315        if self.child.lock().expect("child mutex poisoned").is_none() {
316            return self.returncode().ok_or(ProcessError::NotRunning);
317        }
318        // Fast path: already exited.
319        if let Some(code) = self.returncode() {
320            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
321            return Ok(code);
322        }
323        let start = Instant::now();
324        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
325        loop {
326            // Check returncode (set by exit-waiter thread via atomic + condvar).
327            let rc = self.shared.returncode.load(Ordering::Acquire);
328            if rc != RETURNCODE_NOT_SET {
329                drop(guard);
330                let code = rc as i32;
331                public_symbols::rp_native_process_wait_for_capture_completion_public(self);
332                return Ok(code);
333            }
334            if let Some(limit) = timeout {
335                let elapsed = start.elapsed();
336                if elapsed >= limit {
337                    return Err(ProcessError::Timeout);
338                }
339                let remaining = limit - elapsed;
340                // Wait on condvar with timeout, capped at 50ms to recheck.
341                let wait_time = remaining.min(Duration::from_millis(50));
342                guard = self
343                    .shared
344                    .condvar
345                    .wait_timeout(guard, wait_time)
346                    .expect("queue mutex poisoned")
347                    .0;
348            } else {
349                // Wait on condvar with periodic recheck.
350                guard = self
351                    .shared
352                    .condvar
353                    .wait_timeout(guard, Duration::from_millis(50))
354                    .expect("queue mutex poisoned")
355                    .0;
356            }
357        }
358    }
359
360    // Preserve a stable Rust frame here in release user dumps.
361    #[inline(never)]
362    pub fn kill(&self) -> Result<(), ProcessError> {
363        public_symbols::rp_native_process_kill_public(self)
364    }
365
366    fn kill_impl(&self) -> Result<(), ProcessError> {
367        crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
368        {
369            let mut guard = self.child.lock().expect("child mutex poisoned");
370            let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
371            child.kill().map_err(ProcessError::Io)?;
372            let status = child.wait().map_err(ProcessError::Io)?;
373            self.set_returncode(exit_code(status));
374        }
375        // On Windows, interrupt any pending blocking `read()` in the
376        // per-stream reader threads so they fall out of their loops
377        // immediately. This is what makes the grandchild-pipe-orphan
378        // case (FastLED Bug B: uv.exe spawns a python.exe grandchild
379        // that inherits the pipe and outlives uv) wake up in
380        // microseconds instead of waiting for the bounded-drain
381        // safety-net deadline below.
382        #[cfg(windows)]
383        self.cancel_capture_io();
384        // Synchronize with the per-stream reader threads so that by the
385        // time kill() returns, the capture queues have flipped from
386        // "blocked on read" to "closed" and downstream pollers (e.g.
387        // take_combined_line) observe EOS instead of timeout. Without
388        // this, callers that hit a wait()-timeout path see Python code
389        // raise TimeoutError, kill the child, then race the reader
390        // threads — a 10ms poll loop can miss the EOS flip entirely.
391        //
392        // The deadline is the safety-net: on Windows `CancelIoEx`
393        // above almost always wakes the readers first; on POSIX, and
394        // in any pathological Windows case where `CancelIoEx` doesn't
395        // fire, the deadline guarantees `kill()` still returns.
396        public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
397            self,
398            kill_drain_deadline(),
399        );
400        Ok(())
401    }
402
403    pub fn terminate(&self) -> Result<(), ProcessError> {
404        self.kill()
405    }
406
407    /// Send the OS-appropriate soft termination signal to the child's
408    /// process group (POSIX: SIGTERM to `-pid`; Windows: no soft path
409    /// implemented yet — returns Ok without doing anything so callers
410    /// can run the same code on both platforms and rely on the post-
411    /// grace hard kill).
412    ///
413    /// Requires `ProcessConfig.create_process_group=true` on POSIX so
414    /// that `-pid` resolves to the child's own group. With the default
415    /// `create_process_group=false`, the kill would walk back to the
416    /// caller's group; the method silently no-ops in that case to avoid
417    /// signaling the wrong tree.
418    ///
419    /// Used by the daemon-side pipe sessions (#130 M4 follow-up) so
420    /// that `TerminationOutcome::SoftExit` becomes meaningful on POSIX.
421    pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
422        #[cfg(unix)]
423        {
424            if !self.config.create_process_group {
425                return Ok(());
426            }
427            let pid = match self.pid() {
428                Some(p) => p as i32,
429                None => return Err(ProcessError::NotRunning),
430            };
431            let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
432            if result != 0 {
433                let err = std::io::Error::last_os_error();
434                if err.raw_os_error() != Some(libc::ESRCH) {
435                    return Err(ProcessError::Io(err));
436                }
437            }
438            Ok(())
439        }
440        #[cfg(windows)]
441        {
442            if !self.config.create_process_group {
443                // GenerateConsoleCtrlEvent only routes to children
444                // spawned with CREATE_NEW_PROCESS_GROUP, and the
445                // event would otherwise hit the daemon's own group.
446                // No-op so the hard-kill schedule still wins.
447                return Ok(());
448            }
449            let pid = match self.pid() {
450                Some(p) => p,
451                None => return Err(ProcessError::NotRunning),
452            };
453            // GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT=1, pid).
454            // SAFETY: the FFI call is the standard Windows API; no
455            // borrowed Rust state is involved.
456            let ok = unsafe {
457                winapi::um::wincon::GenerateConsoleCtrlEvent(
458                    winapi::um::wincon::CTRL_BREAK_EVENT,
459                    pid,
460                )
461            };
462            if ok == 0 {
463                let err = std::io::Error::last_os_error();
464                // ERROR_INVALID_HANDLE means the child has already
465                // exited or has detached from the console — treat as
466                // success because the soft step's only goal is to
467                // give the child a chance to exit cleanly, and a
468                // dead/detached child does not need one.
469                if err.raw_os_error() != Some(6) {
470                    return Err(ProcessError::Io(err));
471                }
472            }
473            Ok(())
474        }
475    }
476
477    // Preserve a stable Rust frame here in release user dumps.
478    #[inline(never)]
479    pub fn close(&self) -> Result<(), ProcessError> {
480        public_symbols::rp_native_process_close_public(self)
481    }
482
483    fn close_impl(&self) -> Result<(), ProcessError> {
484        crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
485        if self.child.lock().expect("child mutex poisoned").is_none() {
486            return Ok(());
487        }
488        if self.poll()?.is_none() {
489            self.kill()?;
490        } else {
491            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
492        }
493        Ok(())
494    }
495
496    pub fn pid(&self) -> Option<u32> {
497        self.child
498            .lock()
499            .expect("child mutex poisoned")
500            .as_ref()
501            .map(|state| state.child.id())
502    }
503
504    pub fn returncode(&self) -> Option<i32> {
505        let v = self.shared.returncode.load(Ordering::Acquire);
506        if v == RETURNCODE_NOT_SET {
507            None
508        } else {
509            Some(v as i32)
510        }
511    }
512
513    pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
514        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
515            return false;
516        }
517        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
518        match stream {
519            StreamKind::Stdout => !guard.stdout_queue.is_empty(),
520            StreamKind::Stderr => !guard.stderr_queue.is_empty(),
521        }
522    }
523
524    pub fn has_pending_combined(&self) -> bool {
525        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
526        !guard.combined_queue.is_empty()
527    }
528
529    pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
530        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
531            return Vec::new();
532        }
533        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
534        let queue = match stream {
535            StreamKind::Stdout => &mut guard.stdout_queue,
536            StreamKind::Stderr => &mut guard.stderr_queue,
537        };
538        queue.drain(..).collect()
539    }
540
541    pub fn drain_combined(&self) -> Vec<StreamEvent> {
542        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
543        guard.combined_queue.drain(..).collect()
544    }
545
546    pub fn read_stream(
547        &self,
548        stream: StreamKind,
549        timeout: Option<Duration>,
550    ) -> ReadStatus<Vec<u8>> {
551        let deadline = timeout.map(|limit| Instant::now() + limit);
552        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
553
554        loop {
555            if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
556                return ReadStatus::Eof;
557            }
558
559            let queue = match stream {
560                StreamKind::Stdout => &mut guard.stdout_queue,
561                StreamKind::Stderr => &mut guard.stderr_queue,
562            };
563            if let Some(line) = queue.pop_front() {
564                return ReadStatus::Line(line);
565            }
566
567            let closed = match stream {
568                StreamKind::Stdout => {
569                    if self.config.stderr_mode == StderrMode::Stdout {
570                        guard.stdout_closed && guard.stderr_closed
571                    } else {
572                        guard.stdout_closed
573                    }
574                }
575                StreamKind::Stderr => guard.stderr_closed,
576            };
577            if closed {
578                return ReadStatus::Eof;
579            }
580
581            match deadline {
582                Some(deadline) => {
583                    let now = Instant::now();
584                    if now >= deadline {
585                        return ReadStatus::Timeout;
586                    }
587                    let wait = deadline.saturating_duration_since(now);
588                    let result = self
589                        .shared
590                        .condvar
591                        .wait_timeout(guard, wait)
592                        .expect("queue mutex poisoned");
593                    guard = result.0;
594                    if result.1.timed_out() {
595                        return ReadStatus::Timeout;
596                    }
597                }
598                None => {
599                    guard = self
600                        .shared
601                        .condvar
602                        .wait(guard)
603                        .expect("queue mutex poisoned");
604                }
605            }
606        }
607    }
608
609    // Preserve a stable Rust frame here in release user dumps.
610    #[inline(never)]
611    pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
612        public_symbols::rp_native_process_read_combined_public(self, timeout)
613    }
614
615    fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
616        crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
617        let deadline = timeout.map(|limit| Instant::now() + limit);
618        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
619
620        loop {
621            if let Some(event) = guard.combined_queue.pop_front() {
622                return ReadStatus::Line(event);
623            }
624            if guard.stdout_closed && guard.stderr_closed {
625                return ReadStatus::Eof;
626            }
627
628            match deadline {
629                Some(deadline) => {
630                    let now = Instant::now();
631                    if now >= deadline {
632                        return ReadStatus::Timeout;
633                    }
634                    let wait = deadline.saturating_duration_since(now);
635                    let result = self
636                        .shared
637                        .condvar
638                        .wait_timeout(guard, wait)
639                        .expect("queue mutex poisoned");
640                    guard = result.0;
641                    if result.1.timed_out() {
642                        return ReadStatus::Timeout;
643                    }
644                }
645                None => {
646                    guard = self
647                        .shared
648                        .condvar
649                        .wait(guard)
650                        .expect("queue mutex poisoned");
651                }
652            }
653        }
654    }
655
656    pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
657        self.shared
658            .queues
659            .lock()
660            .expect("queue mutex poisoned")
661            .stdout_history
662            .clone()
663            .into_iter()
664            .collect()
665    }
666
667    fn captured_stdout_raw(&self) -> Vec<u8> {
668        self.shared
669            .queues
670            .lock()
671            .expect("queue mutex poisoned")
672            .stdout_raw
673            .clone()
674    }
675
676    pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
677        if self.config.stderr_mode == StderrMode::Stdout {
678            return Vec::new();
679        }
680        self.shared
681            .queues
682            .lock()
683            .expect("queue mutex poisoned")
684            .stderr_history
685            .clone()
686            .into_iter()
687            .collect()
688    }
689
690    fn captured_stderr_raw(&self) -> Vec<u8> {
691        if self.config.stderr_mode == StderrMode::Stdout {
692            return Vec::new();
693        }
694        self.shared
695            .queues
696            .lock()
697            .expect("queue mutex poisoned")
698            .stderr_raw
699            .clone()
700    }
701
702    pub fn captured_combined(&self) -> Vec<StreamEvent> {
703        self.shared
704            .queues
705            .lock()
706            .expect("queue mutex poisoned")
707            .combined_history
708            .clone()
709            .into_iter()
710            .collect()
711    }
712
713    pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
714        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
715            return 0;
716        }
717        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
718        match stream {
719            StreamKind::Stdout => guard.stdout_history_bytes,
720            StreamKind::Stderr => guard.stderr_history_bytes,
721        }
722    }
723
724    pub fn captured_combined_bytes(&self) -> usize {
725        self.shared
726            .queues
727            .lock()
728            .expect("queue mutex poisoned")
729            .combined_history_bytes
730    }
731
732    pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
733        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
734            return 0;
735        }
736        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
737        match stream {
738            StreamKind::Stdout => {
739                let released = guard.stdout_history_bytes;
740                guard.stdout_history.clear();
741                guard.stdout_raw.clear();
742                guard.stdout_history_bytes = 0;
743                released
744            }
745            StreamKind::Stderr => {
746                let released = guard.stderr_history_bytes;
747                guard.stderr_history.clear();
748                guard.stderr_raw.clear();
749                guard.stderr_history_bytes = 0;
750                released
751            }
752        }
753    }
754
755    pub fn clear_captured_combined(&self) -> usize {
756        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
757        let released = guard.combined_history_bytes;
758        guard.combined_history.clear();
759        guard.combined_history_bytes = 0;
760        released
761    }
762
763    fn build_command(&self) -> Command {
764        let mut command = match &self.config.command {
765            CommandSpec::Shell(command) => shell_command(command),
766            CommandSpec::Argv(argv) => {
767                let mut command = Command::new(&argv[0]);
768                if argv.len() > 1 {
769                    command.args(&argv[1..]);
770                }
771                command
772            }
773        };
774        if let Some(cwd) = &self.config.cwd {
775            command.current_dir(cwd);
776        }
777        if let Some(env) = &self.config.env {
778            command.env_clear();
779            command.envs(env.iter().map(|(k, v)| (k, v)));
780        }
781        #[cfg(windows)]
782        {
783            use std::os::windows::process::CommandExt;
784
785            // CREATE_NEW_PROCESS_GROUP makes GenerateConsoleCtrlEvent
786            // with CTRL_BREAK_EVENT route to this child's group
787            // (rather than the daemon's group) — required for the
788            // pipe-session soft-signal path on Windows (#130 M4).
789            const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
790            let extra = if self.config.create_process_group {
791                CREATE_NEW_PROCESS_GROUP
792            } else {
793                0
794            };
795            let flags = self.config.creationflags.unwrap_or(0)
796                | extra
797                | windows_priority_flags(self.config.nice);
798            if flags != 0 {
799                command.creation_flags(flags);
800            }
801        }
802        #[cfg(unix)]
803        {
804            let create_process_group = self.config.create_process_group;
805            let nice = self.config.nice;
806
807            if create_process_group || nice.is_some() {
808                use std::os::unix::process::CommandExt;
809
810                unsafe {
811                    command.pre_exec(move || {
812                        if create_process_group && libc::setpgid(0, 0) == -1 {
813                            return Err(std::io::Error::last_os_error());
814                        }
815                        if let Some(nice) = nice {
816                            let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
817                            if result == -1 {
818                                return Err(std::io::Error::last_os_error());
819                            }
820                        }
821                        Ok(())
822                    });
823                }
824            }
825        }
826        command
827    }
828
829    fn spawn_reader<R>(
830        &self,
831        pipe: R,
832        source_stream: StreamKind,
833        visible_stream: StreamKind,
834        on_pipe_done: Box<dyn FnOnce() + Send>,
835    ) where
836        R: Read + Send + 'static,
837    {
838        let shared = Arc::clone(&self.shared);
839        thread::spawn(move || {
840            let mut reader = pipe;
841            let mut chunk = vec![0_u8; 65536];
842            let mut pending = Vec::new();
843
844            loop {
845                match reader.read(&mut chunk) {
846                    Ok(0) => break,
847                    Ok(n) => {
848                        append_raw(&shared, visible_stream, &chunk[..n]);
849                        let lines = feed_chunk(&mut pending, &chunk[..n]);
850                        emit_lines(&shared, visible_stream, lines);
851                    }
852                    Err(_) => break,
853                }
854            }
855
856            if !pending.is_empty() {
857                emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
858            }
859
860            // Clear the parent-side pipe-handle slot under its mutex
861            // before dropping the reader. After this returns,
862            // `kill_impl` can no longer try to `CancelIoEx` on us, so
863            // it's safe for `reader`'s drop to close the HANDLE.
864            on_pipe_done();
865            drop(reader);
866
867            let mut guard = shared.queues.lock().expect("queue mutex poisoned");
868            match source_stream {
869                StreamKind::Stdout => guard.stdout_closed = true,
870                StreamKind::Stderr => guard.stderr_closed = true,
871            }
872            shared.condvar.notify_all();
873        });
874    }
875
876    #[cfg(windows)]
877    fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
878        let handles = Arc::clone(&self.capture_pipe_handles);
879        Box::new(move || {
880            let mut guard = handles
881                .lock()
882                .expect("capture pipe handles mutex poisoned");
883            match stream {
884                StreamKind::Stdout => guard.stdout = None,
885                StreamKind::Stderr => guard.stderr = None,
886            }
887        })
888    }
889
890    #[cfg(not(windows))]
891    fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
892        Box::new(|| {})
893    }
894
895    /// Cancel any pending blocking `read()` on the parent-side capture
896    /// pipes so the reader threads' `read()` calls return
897    /// `ERROR_OPERATION_ABORTED` immediately. Used by `kill_impl` to
898    /// break the grandchild-orphan deadlock without waiting on
899    /// `wait_for_capture_completion_with_deadline`'s safety-net.
900    #[cfg(windows)]
901    fn cancel_capture_io(&self) {
902        crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
903        use winapi::shared::ntdef::HANDLE;
904        use winapi::um::ioapiset::CancelIoEx;
905        let guard = self
906            .capture_pipe_handles
907            .lock()
908            .expect("capture pipe handles mutex poisoned");
909        if let Some(h) = guard.stdout {
910            // SAFETY: the slot is `Some` only while the owning reader
911            // thread still holds the `ChildStdout`, so the HANDLE is
912            // valid for the duration of this call. The reader is
913            // blocked in `lock()` on the same mutex if it's racing us
914            // toward exit, so it cannot drop the pipe and close the
915            // HANDLE until we return.
916            unsafe {
917                CancelIoEx(h as HANDLE, std::ptr::null_mut());
918            }
919        }
920        if let Some(h) = guard.stderr {
921            unsafe {
922                CancelIoEx(h as HANDLE, std::ptr::null_mut());
923            }
924        }
925    }
926
927    fn set_returncode(&self, code: i32) {
928        self.shared.returncode.store(code as i64, Ordering::Release);
929        self.shared.condvar.notify_all();
930    }
931
932    fn wait_for_capture_completion_impl(&self) {
933        crate::rp_rust_debug_scope!(
934            "running_process::NativeProcess::wait_for_capture_completion"
935        );
936        if !self.config.capture {
937            return;
938        }
939
940        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
941        while !(guard.stdout_closed && guard.stderr_closed) {
942            guard = self
943                .shared
944                .condvar
945                .wait(guard)
946                .expect("queue mutex poisoned");
947        }
948    }
949
950    /// Like `wait_for_capture_completion_impl` but bounded by `deadline`.
951    /// Returns `true` if the reader threads flipped both closed flags on
952    /// their own, `false` if the deadline elapsed first. On timeout the
953    /// closed flags are force-set (and waiters notified) so downstream
954    /// pollers stop seeing `Timeout` and start seeing `Eof`. A reader
955    /// thread that eventually unblocks after the OS releases the pipe
956    /// will assign `closed = true` again, which is a harmless no-op.
957    fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
958        crate::rp_rust_debug_scope!(
959            "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
960        );
961        if !self.config.capture {
962            return true;
963        }
964
965        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
966        while !(guard.stdout_closed && guard.stderr_closed) {
967            let now = Instant::now();
968            if now >= deadline {
969                guard.stdout_closed = true;
970                guard.stderr_closed = true;
971                self.shared.condvar.notify_all();
972                return false;
973            }
974            let (next_guard, result) = self
975                .shared
976                .condvar
977                .wait_timeout(guard, deadline - now)
978                .expect("queue mutex poisoned");
979            guard = next_guard;
980            if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
981                guard.stdout_closed = true;
982                guard.stderr_closed = true;
983                self.shared.condvar.notify_all();
984                return false;
985            }
986        }
987        true
988    }
989}
990
991fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
992    if lines.is_empty() {
993        return;
994    }
995    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
996    for line in lines {
997        let line_len = line.len();
998        match stream {
999            StreamKind::Stdout => {
1000                guard.stdout_history_bytes += line_len;
1001                guard.stdout_history.push_back(line.clone());
1002                guard.stdout_queue.push_back(line.clone());
1003            }
1004            StreamKind::Stderr => {
1005                guard.stderr_history_bytes += line_len;
1006                guard.stderr_history.push_back(line.clone());
1007                guard.stderr_queue.push_back(line.clone());
1008            }
1009        }
1010        let event = StreamEvent { stream, line };
1011        guard.combined_history_bytes += line_len;
1012        guard.combined_history.push_back(event.clone());
1013        guard.combined_queue.push_back(event);
1014    }
1015    shared.condvar.notify_all();
1016}
1017
1018fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1019    if chunk.is_empty() {
1020        return;
1021    }
1022    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1023    match stream {
1024        StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1025        StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1026    }
1027}
1028
1029/// Run a command to completion while concurrently draining stdout and stderr.
1030///
1031/// The helper forces capture on regardless of `config.capture`, returns raw
1032/// stdout/stderr bytes, and kills the child before returning
1033/// [`ProcessError::Timeout`] when `timeout` elapses.
1034pub fn run_command(
1035    mut config: ProcessConfig,
1036    timeout: Option<Duration>,
1037) -> Result<RunOutput, ProcessError> {
1038    config.capture = true;
1039    let process = NativeProcess::new(config);
1040    process.start()?;
1041
1042    let exit_code = match process.wait(timeout) {
1043        Ok(code) => code,
1044        Err(ProcessError::Timeout) => {
1045            match process.kill() {
1046                Ok(()) | Err(ProcessError::NotRunning) => {}
1047                Err(error) => return Err(error),
1048            }
1049            return Err(ProcessError::Timeout);
1050        }
1051        Err(error) => return Err(error),
1052    };
1053
1054    Ok(RunOutput {
1055        stdout: process.captured_stdout_raw(),
1056        stderr: process.captured_stderr_raw(),
1057        exit_code,
1058    })
1059}
1060
1061pub(crate) fn shell_command(command: &str) -> Command {
1062    #[cfg(windows)]
1063    {
1064        use std::os::windows::process::CommandExt;
1065
1066        let mut cmd = Command::new("cmd");
1067        cmd.raw_arg("/D /S /C \"");
1068        cmd.raw_arg(command);
1069        cmd.raw_arg("\"");
1070        cmd
1071    }
1072    #[cfg(not(windows))]
1073    {
1074        let mut cmd = Command::new("sh");
1075        cmd.arg("-lc").arg(command);
1076        cmd
1077    }
1078}
1079
1080#[cfg(test)]
1081mod tests;