Skip to main content

running_process/
lib.rs

1use std::collections::VecDeque;
2use std::io::Read;
3use std::process::{Child, Command, Stdio};
4use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9pub mod console_detect;
10pub mod containment;
11mod helpers;
12#[cfg(feature = "originator-scan")]
13pub mod originator;
14// Wave 3+4 of #165: proto module + IPC client absorbed from the
15// former `running-process-proto` and `running-process-client` crates.
16// Both gated behind `feature = "client"`. The protobuf package
17// `running_process.daemon.v1` compiles to the file referenced below.
18#[cfg(feature = "client")]
19pub mod proto {
20    pub mod daemon {
21        include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
22    }
23}
24
25#[cfg(feature = "client")]
26pub mod client;
27
28// Wave 5 of #165: daemon runtime absorbed from `running-process-daemon`.
29// Heavy deps (tokio, sqlite, etc.) gated behind `feature = "daemon"`.
30#[cfg(feature = "daemon")]
31pub mod daemon;
32pub mod pty;
33mod public_symbols;
34mod rust_debug;
35pub mod spawn;
36mod types;
37#[cfg(unix)]
38mod unix;
39#[cfg(windows)]
40mod windows;
41
42pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
43pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
44#[cfg(feature = "originator-scan")]
45pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
46pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
47pub use spawn::{
48    spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
49    StdioSource,
50};
51pub use types::{
52    CommandSpec, ProcessConfig, ProcessError, ReadStatus, StderrMode, StdinMode, StreamEvent,
53    StreamKind,
54};
55
56pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
57#[cfg(unix)]
58pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
59#[cfg(windows)]
60pub(crate) use windows::{
61    assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
62    WindowsJobHandle,
63};
64
65#[macro_export]
66macro_rules! rp_rust_debug_scope {
67    ($label:expr) => {
68        let _running_process_rust_debug_scope =
69            $crate::RustDebugScopeGuard::enter($label, file!(), line!());
70    };
71}
72
73#[derive(Default)]
74struct QueueState {
75    stdout_queue: VecDeque<Vec<u8>>,
76    stderr_queue: VecDeque<Vec<u8>>,
77    combined_queue: VecDeque<StreamEvent>,
78    stdout_history: VecDeque<Vec<u8>>,
79    stderr_history: VecDeque<Vec<u8>>,
80    combined_history: VecDeque<StreamEvent>,
81    stdout_history_bytes: usize,
82    stderr_history_bytes: usize,
83    combined_history_bytes: usize,
84    stdout_closed: bool,
85    stderr_closed: bool,
86}
87
88/// Sentinel value for returncode atomic: process has not exited yet.
89const RETURNCODE_NOT_SET: i64 = i64::MIN;
90
91struct SharedState {
92    queues: Mutex<QueueState>,
93    condvar: Condvar,
94    /// Atomic exit code. `RETURNCODE_NOT_SET` means "not exited yet".
95    /// Updated by a background waiter thread — reading is lock-free.
96    returncode: AtomicI64,
97}
98
99struct ChildState {
100    child: Child,
101    #[cfg(windows)]
102    _job: WindowsJobHandle,
103}
104
105impl SharedState {
106    fn new(capture: bool) -> Self {
107        let queues = QueueState {
108            stdout_closed: !capture,
109            stderr_closed: !capture,
110            ..QueueState::default()
111        };
112        Self {
113            queues: Mutex::new(queues),
114            condvar: Condvar::new(),
115            returncode: AtomicI64::new(RETURNCODE_NOT_SET),
116        }
117    }
118}
119
120pub struct NativeProcess {
121    config: ProcessConfig,
122    child: Arc<Mutex<Option<ChildState>>>,
123    shared: Arc<SharedState>,
124    #[cfg(windows)]
125    capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
126}
127
128impl NativeProcess {
129    pub fn new(config: ProcessConfig) -> Self {
130        Self {
131            shared: Arc::new(SharedState::new(config.capture)),
132            child: Arc::new(Mutex::new(None)),
133            config,
134            #[cfg(windows)]
135            capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
136        }
137    }
138
139    // Preserve a stable Rust frame here in release user dumps.
140    #[inline(never)]
141    pub fn start(&self) -> Result<(), ProcessError> {
142        public_symbols::rp_native_process_start_public(self)
143    }
144
145    fn start_impl(&self) -> Result<(), ProcessError> {
146        crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
147        let mut guard = self.child.lock().expect("child mutex poisoned");
148        if guard.is_some() {
149            return Err(ProcessError::AlreadyStarted);
150        }
151
152        let mut command = self.build_command();
153        match self.config.stdin_mode {
154            StdinMode::Inherit => {}
155            StdinMode::Piped => {
156                command.stdin(Stdio::piped());
157            }
158            StdinMode::Null => {
159                command.stdin(Stdio::null());
160            }
161        }
162        if self.config.capture {
163            command.stdout(Stdio::piped());
164            command.stderr(Stdio::piped());
165        }
166
167        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
168        log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
169        #[cfg(windows)]
170        let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
171            .map_err(ProcessError::Spawn)?;
172        if self.config.capture {
173            let stdout = child.stdout.take().expect("stdout pipe missing");
174            let stderr = child.stderr.take().expect("stderr pipe missing");
175            #[cfg(windows)]
176            {
177                use std::os::windows::io::AsRawHandle;
178                let mut handles = self
179                    .capture_pipe_handles
180                    .lock()
181                    .expect("capture pipe handles mutex poisoned");
182                handles.stdout = Some(stdout.as_raw_handle() as usize);
183                handles.stderr = Some(stderr.as_raw_handle() as usize);
184            }
185            self.spawn_reader(
186                stdout,
187                StreamKind::Stdout,
188                StreamKind::Stdout,
189                self.pipe_done_callback(StreamKind::Stdout),
190            );
191            self.spawn_reader(
192                stderr,
193                StreamKind::Stderr,
194                match self.config.stderr_mode {
195                    StderrMode::Stdout => StreamKind::Stdout,
196                    StderrMode::Pipe => StreamKind::Stderr,
197                },
198                self.pipe_done_callback(StreamKind::Stderr),
199            );
200        }
201        *guard = Some(ChildState {
202            child,
203            #[cfg(windows)]
204            _job: job,
205        });
206        drop(guard);
207        self.spawn_exit_waiter();
208        Ok(())
209    }
210
211    /// Background thread that polls for process exit and stores the exit code
212    /// atomically. This makes `returncode` auto-update without explicit `poll()`.
213    fn spawn_exit_waiter(&self) {
214        let child = Arc::clone(&self.child);
215        let shared = Arc::clone(&self.shared);
216        thread::spawn(move || loop {
217            if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
218                return;
219            }
220            {
221                let mut guard = child.lock().expect("child mutex poisoned");
222                if let Some(child_state) = guard.as_mut() {
223                    match child_state.child.try_wait() {
224                        Ok(Some(status)) => {
225                            let code = exit_code(status);
226                            shared.returncode.store(code as i64, Ordering::Release);
227                            shared.condvar.notify_all();
228                            return;
229                        }
230                        Ok(None) => {}
231                        Err(_) => return,
232                    }
233                } else {
234                    return;
235                }
236            }
237            // #199: intentional — capture thread polling for
238            // child-exit. `try_wait` is non-blocking by design;
239            // we can't block here because the thread also drains
240            // pipe state alongside the exit check. 10ms keeps the
241            // CPU cost negligible while staying responsive.
242            thread::sleep(Duration::from_millis(10));
243        });
244    }
245
246    pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
247        let mut guard = self.child.lock().expect("child mutex poisoned");
248        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
249        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
250        use std::io::Write;
251        stdin.write_all(data).map_err(ProcessError::Io)?;
252        stdin.flush().map_err(ProcessError::Io)?;
253        drop(child.stdin.take());
254        Ok(())
255    }
256
257    /// Write to the child's stdin without closing it afterwards, so the
258    /// caller can issue additional writes. Used by interactive
259    /// pipe-backed sessions (#130 milestone 3) where the daemon keeps
260    /// stdin open across multiple client input frames.
261    pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
262        let mut guard = self.child.lock().expect("child mutex poisoned");
263        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
264        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
265        use std::io::Write;
266        stdin.write_all(data).map_err(ProcessError::Io)?;
267        stdin.flush().map_err(ProcessError::Io)?;
268        Ok(())
269    }
270
271    /// Explicitly close the child's stdin (signals EOF to the child).
272    /// Idempotent: returns Ok if stdin was already closed.
273    pub fn close_stdin(&self) -> Result<(), ProcessError> {
274        let mut guard = self.child.lock().expect("child mutex poisoned");
275        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
276        drop(child.stdin.take());
277        Ok(())
278    }
279
280    pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
281        // Fast path: check atomic set by background waiter thread.
282        if let Some(code) = self.returncode() {
283            return Ok(Some(code));
284        }
285        let mut guard = self.child.lock().expect("child mutex poisoned");
286        let Some(child_state) = guard.as_mut() else {
287            return Ok(self.returncode());
288        };
289        let child = &mut child_state.child;
290        let status = child.try_wait().map_err(ProcessError::Io)?;
291        if let Some(status) = status {
292            let code = exit_code(status);
293            self.set_returncode(code);
294            return Ok(Some(code));
295        }
296        Ok(None)
297    }
298
299    // Preserve a stable Rust frame here in release user dumps.
300    #[inline(never)]
301    pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
302        public_symbols::rp_native_process_wait_public(self, timeout)
303    }
304
305    fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
306        crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
307        if self.child.lock().expect("child mutex poisoned").is_none() {
308            return self.returncode().ok_or(ProcessError::NotRunning);
309        }
310        // Fast path: already exited.
311        if let Some(code) = self.returncode() {
312            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
313            return Ok(code);
314        }
315        let start = Instant::now();
316        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
317        loop {
318            // Check returncode (set by exit-waiter thread via atomic + condvar).
319            let rc = self.shared.returncode.load(Ordering::Acquire);
320            if rc != RETURNCODE_NOT_SET {
321                drop(guard);
322                let code = rc as i32;
323                public_symbols::rp_native_process_wait_for_capture_completion_public(self);
324                return Ok(code);
325            }
326            if let Some(limit) = timeout {
327                let elapsed = start.elapsed();
328                if elapsed >= limit {
329                    return Err(ProcessError::Timeout);
330                }
331                let remaining = limit - elapsed;
332                // Wait on condvar with timeout, capped at 50ms to recheck.
333                let wait_time = remaining.min(Duration::from_millis(50));
334                guard = self
335                    .shared
336                    .condvar
337                    .wait_timeout(guard, wait_time)
338                    .expect("queue mutex poisoned")
339                    .0;
340            } else {
341                // Wait on condvar with periodic recheck.
342                guard = self
343                    .shared
344                    .condvar
345                    .wait_timeout(guard, Duration::from_millis(50))
346                    .expect("queue mutex poisoned")
347                    .0;
348            }
349        }
350    }
351
352    // Preserve a stable Rust frame here in release user dumps.
353    #[inline(never)]
354    pub fn kill(&self) -> Result<(), ProcessError> {
355        public_symbols::rp_native_process_kill_public(self)
356    }
357
358    fn kill_impl(&self) -> Result<(), ProcessError> {
359        crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
360        {
361            let mut guard = self.child.lock().expect("child mutex poisoned");
362            let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
363            child.kill().map_err(ProcessError::Io)?;
364            let status = child.wait().map_err(ProcessError::Io)?;
365            self.set_returncode(exit_code(status));
366        }
367        // On Windows, interrupt any pending blocking `read()` in the
368        // per-stream reader threads so they fall out of their loops
369        // immediately. This is what makes the grandchild-pipe-orphan
370        // case (FastLED Bug B: uv.exe spawns a python.exe grandchild
371        // that inherits the pipe and outlives uv) wake up in
372        // microseconds instead of waiting for the bounded-drain
373        // safety-net deadline below.
374        #[cfg(windows)]
375        self.cancel_capture_io();
376        // Synchronize with the per-stream reader threads so that by the
377        // time kill() returns, the capture queues have flipped from
378        // "blocked on read" to "closed" and downstream pollers (e.g.
379        // take_combined_line) observe EOS instead of timeout. Without
380        // this, callers that hit a wait()-timeout path see Python code
381        // raise TimeoutError, kill the child, then race the reader
382        // threads — a 10ms poll loop can miss the EOS flip entirely.
383        //
384        // The deadline is the safety-net: on Windows `CancelIoEx`
385        // above almost always wakes the readers first; on POSIX, and
386        // in any pathological Windows case where `CancelIoEx` doesn't
387        // fire, the deadline guarantees `kill()` still returns.
388        public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
389            self,
390            kill_drain_deadline(),
391        );
392        Ok(())
393    }
394
395    pub fn terminate(&self) -> Result<(), ProcessError> {
396        self.kill()
397    }
398
399    /// Send the OS-appropriate soft termination signal to the child's
400    /// process group (POSIX: SIGTERM to `-pid`; Windows: no soft path
401    /// implemented yet — returns Ok without doing anything so callers
402    /// can run the same code on both platforms and rely on the post-
403    /// grace hard kill).
404    ///
405    /// Requires `ProcessConfig.create_process_group=true` on POSIX so
406    /// that `-pid` resolves to the child's own group. With the default
407    /// `create_process_group=false`, the kill would walk back to the
408    /// caller's group; the method silently no-ops in that case to avoid
409    /// signaling the wrong tree.
410    ///
411    /// Used by the daemon-side pipe sessions (#130 M4 follow-up) so
412    /// that `TerminationOutcome::SoftExit` becomes meaningful on POSIX.
413    pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
414        #[cfg(unix)]
415        {
416            if !self.config.create_process_group {
417                return Ok(());
418            }
419            let pid = match self.pid() {
420                Some(p) => p as i32,
421                None => return Err(ProcessError::NotRunning),
422            };
423            let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
424            if result != 0 {
425                let err = std::io::Error::last_os_error();
426                if err.raw_os_error() != Some(libc::ESRCH) {
427                    return Err(ProcessError::Io(err));
428                }
429            }
430            Ok(())
431        }
432        #[cfg(windows)]
433        {
434            if !self.config.create_process_group {
435                // GenerateConsoleCtrlEvent only routes to children
436                // spawned with CREATE_NEW_PROCESS_GROUP, and the
437                // event would otherwise hit the daemon's own group.
438                // No-op so the hard-kill schedule still wins.
439                return Ok(());
440            }
441            let pid = match self.pid() {
442                Some(p) => p,
443                None => return Err(ProcessError::NotRunning),
444            };
445            // GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT=1, pid).
446            // SAFETY: the FFI call is the standard Windows API; no
447            // borrowed Rust state is involved.
448            let ok = unsafe {
449                winapi::um::wincon::GenerateConsoleCtrlEvent(
450                    winapi::um::wincon::CTRL_BREAK_EVENT,
451                    pid,
452                )
453            };
454            if ok == 0 {
455                let err = std::io::Error::last_os_error();
456                // ERROR_INVALID_HANDLE means the child has already
457                // exited or has detached from the console — treat as
458                // success because the soft step's only goal is to
459                // give the child a chance to exit cleanly, and a
460                // dead/detached child does not need one.
461                if err.raw_os_error() != Some(6) {
462                    return Err(ProcessError::Io(err));
463                }
464            }
465            Ok(())
466        }
467    }
468
469    // Preserve a stable Rust frame here in release user dumps.
470    #[inline(never)]
471    pub fn close(&self) -> Result<(), ProcessError> {
472        public_symbols::rp_native_process_close_public(self)
473    }
474
475    fn close_impl(&self) -> Result<(), ProcessError> {
476        crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
477        if self.child.lock().expect("child mutex poisoned").is_none() {
478            return Ok(());
479        }
480        if self.poll()?.is_none() {
481            self.kill()?;
482        } else {
483            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
484        }
485        Ok(())
486    }
487
488    pub fn pid(&self) -> Option<u32> {
489        self.child
490            .lock()
491            .expect("child mutex poisoned")
492            .as_ref()
493            .map(|state| state.child.id())
494    }
495
496    pub fn returncode(&self) -> Option<i32> {
497        let v = self.shared.returncode.load(Ordering::Acquire);
498        if v == RETURNCODE_NOT_SET {
499            None
500        } else {
501            Some(v as i32)
502        }
503    }
504
505    pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
506        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
507            return false;
508        }
509        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
510        match stream {
511            StreamKind::Stdout => !guard.stdout_queue.is_empty(),
512            StreamKind::Stderr => !guard.stderr_queue.is_empty(),
513        }
514    }
515
516    pub fn has_pending_combined(&self) -> bool {
517        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
518        !guard.combined_queue.is_empty()
519    }
520
521    pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
522        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
523            return Vec::new();
524        }
525        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
526        let queue = match stream {
527            StreamKind::Stdout => &mut guard.stdout_queue,
528            StreamKind::Stderr => &mut guard.stderr_queue,
529        };
530        queue.drain(..).collect()
531    }
532
533    pub fn drain_combined(&self) -> Vec<StreamEvent> {
534        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
535        guard.combined_queue.drain(..).collect()
536    }
537
538    pub fn read_stream(
539        &self,
540        stream: StreamKind,
541        timeout: Option<Duration>,
542    ) -> ReadStatus<Vec<u8>> {
543        let deadline = timeout.map(|limit| Instant::now() + limit);
544        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
545
546        loop {
547            if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
548                return ReadStatus::Eof;
549            }
550
551            let queue = match stream {
552                StreamKind::Stdout => &mut guard.stdout_queue,
553                StreamKind::Stderr => &mut guard.stderr_queue,
554            };
555            if let Some(line) = queue.pop_front() {
556                return ReadStatus::Line(line);
557            }
558
559            let closed = match stream {
560                StreamKind::Stdout => {
561                    if self.config.stderr_mode == StderrMode::Stdout {
562                        guard.stdout_closed && guard.stderr_closed
563                    } else {
564                        guard.stdout_closed
565                    }
566                }
567                StreamKind::Stderr => guard.stderr_closed,
568            };
569            if closed {
570                return ReadStatus::Eof;
571            }
572
573            match deadline {
574                Some(deadline) => {
575                    let now = Instant::now();
576                    if now >= deadline {
577                        return ReadStatus::Timeout;
578                    }
579                    let wait = deadline.saturating_duration_since(now);
580                    let result = self
581                        .shared
582                        .condvar
583                        .wait_timeout(guard, wait)
584                        .expect("queue mutex poisoned");
585                    guard = result.0;
586                    if result.1.timed_out() {
587                        return ReadStatus::Timeout;
588                    }
589                }
590                None => {
591                    guard = self
592                        .shared
593                        .condvar
594                        .wait(guard)
595                        .expect("queue mutex poisoned");
596                }
597            }
598        }
599    }
600
601    // Preserve a stable Rust frame here in release user dumps.
602    #[inline(never)]
603    pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
604        public_symbols::rp_native_process_read_combined_public(self, timeout)
605    }
606
607    fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
608        crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
609        let deadline = timeout.map(|limit| Instant::now() + limit);
610        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
611
612        loop {
613            if let Some(event) = guard.combined_queue.pop_front() {
614                return ReadStatus::Line(event);
615            }
616            if guard.stdout_closed && guard.stderr_closed {
617                return ReadStatus::Eof;
618            }
619
620            match deadline {
621                Some(deadline) => {
622                    let now = Instant::now();
623                    if now >= deadline {
624                        return ReadStatus::Timeout;
625                    }
626                    let wait = deadline.saturating_duration_since(now);
627                    let result = self
628                        .shared
629                        .condvar
630                        .wait_timeout(guard, wait)
631                        .expect("queue mutex poisoned");
632                    guard = result.0;
633                    if result.1.timed_out() {
634                        return ReadStatus::Timeout;
635                    }
636                }
637                None => {
638                    guard = self
639                        .shared
640                        .condvar
641                        .wait(guard)
642                        .expect("queue mutex poisoned");
643                }
644            }
645        }
646    }
647
648    pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
649        self.shared
650            .queues
651            .lock()
652            .expect("queue mutex poisoned")
653            .stdout_history
654            .clone()
655            .into_iter()
656            .collect()
657    }
658
659    pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
660        if self.config.stderr_mode == StderrMode::Stdout {
661            return Vec::new();
662        }
663        self.shared
664            .queues
665            .lock()
666            .expect("queue mutex poisoned")
667            .stderr_history
668            .clone()
669            .into_iter()
670            .collect()
671    }
672
673    pub fn captured_combined(&self) -> Vec<StreamEvent> {
674        self.shared
675            .queues
676            .lock()
677            .expect("queue mutex poisoned")
678            .combined_history
679            .clone()
680            .into_iter()
681            .collect()
682    }
683
684    pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
685        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
686            return 0;
687        }
688        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
689        match stream {
690            StreamKind::Stdout => guard.stdout_history_bytes,
691            StreamKind::Stderr => guard.stderr_history_bytes,
692        }
693    }
694
695    pub fn captured_combined_bytes(&self) -> usize {
696        self.shared
697            .queues
698            .lock()
699            .expect("queue mutex poisoned")
700            .combined_history_bytes
701    }
702
703    pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
704        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
705            return 0;
706        }
707        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
708        match stream {
709            StreamKind::Stdout => {
710                let released = guard.stdout_history_bytes;
711                guard.stdout_history.clear();
712                guard.stdout_history_bytes = 0;
713                released
714            }
715            StreamKind::Stderr => {
716                let released = guard.stderr_history_bytes;
717                guard.stderr_history.clear();
718                guard.stderr_history_bytes = 0;
719                released
720            }
721        }
722    }
723
724    pub fn clear_captured_combined(&self) -> usize {
725        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
726        let released = guard.combined_history_bytes;
727        guard.combined_history.clear();
728        guard.combined_history_bytes = 0;
729        released
730    }
731
732    fn build_command(&self) -> Command {
733        let mut command = match &self.config.command {
734            CommandSpec::Shell(command) => shell_command(command),
735            CommandSpec::Argv(argv) => {
736                let mut command = Command::new(&argv[0]);
737                if argv.len() > 1 {
738                    command.args(&argv[1..]);
739                }
740                command
741            }
742        };
743        if let Some(cwd) = &self.config.cwd {
744            command.current_dir(cwd);
745        }
746        if let Some(env) = &self.config.env {
747            command.env_clear();
748            command.envs(env.iter().map(|(k, v)| (k, v)));
749        }
750        #[cfg(windows)]
751        {
752            use std::os::windows::process::CommandExt;
753
754            // CREATE_NEW_PROCESS_GROUP makes GenerateConsoleCtrlEvent
755            // with CTRL_BREAK_EVENT route to this child's group
756            // (rather than the daemon's group) — required for the
757            // pipe-session soft-signal path on Windows (#130 M4).
758            const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
759            let extra = if self.config.create_process_group {
760                CREATE_NEW_PROCESS_GROUP
761            } else {
762                0
763            };
764            let flags = self.config.creationflags.unwrap_or(0)
765                | extra
766                | windows_priority_flags(self.config.nice);
767            if flags != 0 {
768                command.creation_flags(flags);
769            }
770        }
771        #[cfg(unix)]
772        {
773            let create_process_group = self.config.create_process_group;
774            let nice = self.config.nice;
775
776            if create_process_group || nice.is_some() {
777                use std::os::unix::process::CommandExt;
778
779                unsafe {
780                    command.pre_exec(move || {
781                        if create_process_group && libc::setpgid(0, 0) == -1 {
782                            return Err(std::io::Error::last_os_error());
783                        }
784                        if let Some(nice) = nice {
785                            let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
786                            if result == -1 {
787                                return Err(std::io::Error::last_os_error());
788                            }
789                        }
790                        Ok(())
791                    });
792                }
793            }
794        }
795        command
796    }
797
798    fn spawn_reader<R>(
799        &self,
800        pipe: R,
801        source_stream: StreamKind,
802        visible_stream: StreamKind,
803        on_pipe_done: Box<dyn FnOnce() + Send>,
804    ) where
805        R: Read + Send + 'static,
806    {
807        let shared = Arc::clone(&self.shared);
808        thread::spawn(move || {
809            let mut reader = pipe;
810            let mut chunk = vec![0_u8; 65536];
811            let mut pending = Vec::new();
812
813            loop {
814                match reader.read(&mut chunk) {
815                    Ok(0) => break,
816                    Ok(n) => {
817                        let lines = feed_chunk(&mut pending, &chunk[..n]);
818                        emit_lines(&shared, visible_stream, lines);
819                    }
820                    Err(_) => break,
821                }
822            }
823
824            if !pending.is_empty() {
825                emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
826            }
827
828            // Clear the parent-side pipe-handle slot under its mutex
829            // before dropping the reader. After this returns,
830            // `kill_impl` can no longer try to `CancelIoEx` on us, so
831            // it's safe for `reader`'s drop to close the HANDLE.
832            on_pipe_done();
833            drop(reader);
834
835            let mut guard = shared.queues.lock().expect("queue mutex poisoned");
836            match source_stream {
837                StreamKind::Stdout => guard.stdout_closed = true,
838                StreamKind::Stderr => guard.stderr_closed = true,
839            }
840            shared.condvar.notify_all();
841        });
842    }
843
844    #[cfg(windows)]
845    fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
846        let handles = Arc::clone(&self.capture_pipe_handles);
847        Box::new(move || {
848            let mut guard = handles
849                .lock()
850                .expect("capture pipe handles mutex poisoned");
851            match stream {
852                StreamKind::Stdout => guard.stdout = None,
853                StreamKind::Stderr => guard.stderr = None,
854            }
855        })
856    }
857
858    #[cfg(not(windows))]
859    fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
860        Box::new(|| {})
861    }
862
863    /// Cancel any pending blocking `read()` on the parent-side capture
864    /// pipes so the reader threads' `read()` calls return
865    /// `ERROR_OPERATION_ABORTED` immediately. Used by `kill_impl` to
866    /// break the grandchild-orphan deadlock without waiting on
867    /// `wait_for_capture_completion_with_deadline`'s safety-net.
868    #[cfg(windows)]
869    fn cancel_capture_io(&self) {
870        crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
871        use winapi::shared::ntdef::HANDLE;
872        use winapi::um::ioapiset::CancelIoEx;
873        let guard = self
874            .capture_pipe_handles
875            .lock()
876            .expect("capture pipe handles mutex poisoned");
877        if let Some(h) = guard.stdout {
878            // SAFETY: the slot is `Some` only while the owning reader
879            // thread still holds the `ChildStdout`, so the HANDLE is
880            // valid for the duration of this call. The reader is
881            // blocked in `lock()` on the same mutex if it's racing us
882            // toward exit, so it cannot drop the pipe and close the
883            // HANDLE until we return.
884            unsafe {
885                CancelIoEx(h as HANDLE, std::ptr::null_mut());
886            }
887        }
888        if let Some(h) = guard.stderr {
889            unsafe {
890                CancelIoEx(h as HANDLE, std::ptr::null_mut());
891            }
892        }
893    }
894
895    fn set_returncode(&self, code: i32) {
896        self.shared.returncode.store(code as i64, Ordering::Release);
897        self.shared.condvar.notify_all();
898    }
899
900    fn wait_for_capture_completion_impl(&self) {
901        crate::rp_rust_debug_scope!(
902            "running_process::NativeProcess::wait_for_capture_completion"
903        );
904        if !self.config.capture {
905            return;
906        }
907
908        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
909        while !(guard.stdout_closed && guard.stderr_closed) {
910            guard = self
911                .shared
912                .condvar
913                .wait(guard)
914                .expect("queue mutex poisoned");
915        }
916    }
917
918    /// Like `wait_for_capture_completion_impl` but bounded by `deadline`.
919    /// Returns `true` if the reader threads flipped both closed flags on
920    /// their own, `false` if the deadline elapsed first. On timeout the
921    /// closed flags are force-set (and waiters notified) so downstream
922    /// pollers stop seeing `Timeout` and start seeing `Eof`. A reader
923    /// thread that eventually unblocks after the OS releases the pipe
924    /// will assign `closed = true` again, which is a harmless no-op.
925    fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
926        crate::rp_rust_debug_scope!(
927            "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
928        );
929        if !self.config.capture {
930            return true;
931        }
932
933        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
934        while !(guard.stdout_closed && guard.stderr_closed) {
935            let now = Instant::now();
936            if now >= deadline {
937                guard.stdout_closed = true;
938                guard.stderr_closed = true;
939                self.shared.condvar.notify_all();
940                return false;
941            }
942            let (next_guard, result) = self
943                .shared
944                .condvar
945                .wait_timeout(guard, deadline - now)
946                .expect("queue mutex poisoned");
947            guard = next_guard;
948            if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
949                guard.stdout_closed = true;
950                guard.stderr_closed = true;
951                self.shared.condvar.notify_all();
952                return false;
953            }
954        }
955        true
956    }
957}
958
959fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
960    if lines.is_empty() {
961        return;
962    }
963    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
964    for line in lines {
965        let line_len = line.len();
966        match stream {
967            StreamKind::Stdout => {
968                guard.stdout_history_bytes += line_len;
969                guard.stdout_history.push_back(line.clone());
970                guard.stdout_queue.push_back(line.clone());
971            }
972            StreamKind::Stderr => {
973                guard.stderr_history_bytes += line_len;
974                guard.stderr_history.push_back(line.clone());
975                guard.stderr_queue.push_back(line.clone());
976            }
977        }
978        let event = StreamEvent { stream, line };
979        guard.combined_history_bytes += line_len;
980        guard.combined_history.push_back(event.clone());
981        guard.combined_queue.push_back(event);
982    }
983    shared.condvar.notify_all();
984}
985
986pub(crate) fn shell_command(command: &str) -> Command {
987    #[cfg(windows)]
988    {
989        use std::os::windows::process::CommandExt;
990
991        let mut cmd = Command::new("cmd");
992        cmd.raw_arg("/D /S /C \"");
993        cmd.raw_arg(command);
994        cmd.raw_arg("\"");
995        cmd
996    }
997    #[cfg(not(windows))]
998    {
999        let mut cmd = Command::new("sh");
1000        cmd.arg("-lc").arg(command);
1001        cmd
1002    }
1003}
1004
1005#[cfg(test)]
1006mod tests;