Skip to main content

coreshift_core/spawn/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/
4
5//! Process spawning and lifecycle management.
6//!
7//! This module exposes explicit Linux/Android process primitives. Callers must
8//! provide the exact argument vector and choose the spawn backend. Core does not
9//! infer shell/root behavior, select backends from platform properties, or
10//! silently switch between backends.
11
12use std::ffi::CString;
13use std::mem::MaybeUninit;
14use std::os::unix::io::RawFd;
15use std::ptr;
16
17use crate::CoreError;
18use crate::error::{posix_ret, syscall_ret};
19use crate::reactor::Fd;
20use crate::signal::SignalRuntime;
21use libc::{
22    O_CLOEXEC, O_NONBLOCK, WEXITSTATUS, WIFEXITED, WIFSIGNALED, WTERMSIG, c_char, pid_t, pipe2,
23    waitpid,
24};
25
26unsafe extern "C" {
27    pub(crate) static mut environ: *mut *mut libc::c_char;
28}
29
30pub(crate) const POSIX_SPAWN_SETPGROUP: i32 = 2;
31pub(crate) const POSIX_SPAWN_SETSIGDEF: i32 = 4;
32pub(crate) const POSIX_SPAWN_SETSIGMASK: i32 = 8;
33
34unsafe extern "C" {
35    pub(crate) fn posix_spawn(
36        pid: *mut libc::pid_t,
37        path: *const libc::c_char,
38        file_actions: *const libc::posix_spawn_file_actions_t,
39        attrp: *const libc::posix_spawnattr_t,
40        argv: *const *mut libc::c_char,
41        envp: *const *mut libc::c_char,
42    ) -> libc::c_int;
43
44    pub(crate) fn posix_spawn_file_actions_addclose(
45        file_actions: *mut libc::posix_spawn_file_actions_t,
46        fd: libc::c_int,
47    ) -> libc::c_int;
48
49    pub(crate) fn posix_spawn_file_actions_adddup2(
50        file_actions: *mut libc::posix_spawn_file_actions_t,
51        fd: libc::c_int,
52        newfd: libc::c_int,
53    ) -> libc::c_int;
54
55    pub(crate) fn posix_spawn_file_actions_destroy(
56        file_actions: *mut libc::posix_spawn_file_actions_t,
57    ) -> libc::c_int;
58
59    pub(crate) fn posix_spawn_file_actions_init(
60        file_actions: *mut libc::posix_spawn_file_actions_t,
61    ) -> libc::c_int;
62
63    pub(crate) fn posix_spawnattr_destroy(attr: *mut libc::posix_spawnattr_t) -> libc::c_int;
64
65    pub(crate) fn posix_spawnattr_init(attr: *mut libc::posix_spawnattr_t) -> libc::c_int;
66
67    pub(crate) fn posix_spawnattr_setflags(
68        attr: *mut libc::posix_spawnattr_t,
69        flags: libc::c_short,
70    ) -> libc::c_int;
71
72    pub(crate) fn posix_spawnattr_setpgroup(
73        attr: *mut libc::posix_spawnattr_t,
74        pgroup: libc::pid_t,
75    ) -> libc::c_int;
76
77    pub(crate) fn posix_spawnattr_setsigdefault(
78        attr: *mut libc::posix_spawnattr_t,
79        sigdefault: *const libc::sigset_t,
80    ) -> libc::c_int;
81
82    pub(crate) fn posix_spawnattr_setsigmask(
83        attr: *mut libc::posix_spawnattr_t,
84        sigmask: *const libc::sigset_t,
85    ) -> libc::c_int;
86}
87
88/// Policy for handling process cancellation or timeouts.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum CancelPolicy {
91    /// Do nothing on cancellation; let the process run to completion.
92    #[default]
93    None,
94    /// Send SIGTERM, then SIGKILL after a grace period.
95    Graceful,
96    /// Send SIGKILL immediately.
97    Kill,
98}
99
100/// Process group and session configuration.
101#[derive(Debug, Clone, Copy, Default)]
102pub struct ProcessGroup {
103    /// Join an existing process group leader.
104    pub leader: Option<pid_t>,
105    /// Create a new session (`setsid`).
106    pub isolated: bool,
107}
108
109impl ProcessGroup {
110    /// Create a new process group configuration.
111    pub fn new(leader: Option<pid_t>, isolated: bool) -> Self {
112        Self { leader, isolated }
113    }
114}
115
116#[inline(always)]
117fn errno() -> i32 {
118    std::io::Error::last_os_error().raw_os_error().unwrap_or(0)
119}
120
121/// Creates a pipe with O_CLOEXEC | O_NONBLOCK flags.
122/// Invariants: FDs returned are strictly non-negative and will close automatically on drop.
123#[inline(always)]
124fn make_pipe() -> Result<(Fd, Fd), CoreError> {
125    let mut fds = [0; 2];
126    let r = unsafe { pipe2(fds.as_mut_ptr(), O_CLOEXEC | O_NONBLOCK) };
127    syscall_ret(r, "pipe2")?;
128    Ok((Fd::new(fds[0], "pipe2")?, Fd::new(fds[1], "pipe2")?))
129}
130
131fn make_cloexec_pipe() -> Result<(RawFd, RawFd), CoreError> {
132    let mut fds = [0; 2];
133    let r = unsafe { pipe2(fds.as_mut_ptr(), O_CLOEXEC) };
134    syscall_ret(r, "pipe2")?;
135    Ok((fds[0], fds[1]))
136}
137
138#[repr(u8)]
139#[derive(Clone, Copy)]
140enum ChildSetupOp {
141    DupStdin = 1,
142    DupStdout = 2,
143    DupStderr = 3,
144    Setsid = 4,
145    Chdir = 5,
146    Setpgid = 6,
147    SignalMask = 7,
148    Execve = 8,
149}
150
151impl ChildSetupOp {
152    fn as_str(self) -> &'static str {
153        match self {
154            Self::DupStdin => "fork child dup2 stdin",
155            Self::DupStdout => "fork child dup2 stdout",
156            Self::DupStderr => "fork child dup2 stderr",
157            Self::Setsid => "fork child setsid",
158            Self::Chdir => "fork child chdir",
159            Self::Setpgid => "fork child setpgid",
160            Self::SignalMask => "fork child signal setup",
161            Self::Execve => "fork child execve",
162        }
163    }
164
165    fn from_u8(value: u8) -> Self {
166        match value {
167            1 => Self::DupStdin,
168            2 => Self::DupStdout,
169            3 => Self::DupStderr,
170            4 => Self::Setsid,
171            5 => Self::Chdir,
172            6 => Self::Setpgid,
173            7 => Self::SignalMask,
174            _ => Self::Execve,
175        }
176    }
177}
178
179unsafe fn report_child_setup_error(fd: RawFd, op: ChildSetupOp, code: i32) -> ! {
180    let mut msg = [0u8; 5];
181    msg[..4].copy_from_slice(&code.to_ne_bytes());
182    msg[4] = op as u8;
183    let mut written = 0;
184    while written < msg.len() {
185        let n = unsafe {
186            libc::write(
187                fd,
188                msg[written..].as_ptr().cast::<libc::c_void>(),
189                msg.len() - written,
190            )
191        };
192        if n <= 0 {
193            break;
194        }
195        written += n as usize;
196    }
197    unsafe {
198        libc::_exit(127);
199    }
200}
201
202fn read_child_setup_error(fd: RawFd) -> Result<Option<CoreError>, CoreError> {
203    let mut msg = [0u8; 5];
204    let mut read_len = 0;
205    loop {
206        let n = unsafe {
207            libc::read(
208                fd,
209                msg[read_len..].as_mut_ptr().cast::<libc::c_void>(),
210                msg.len() - read_len,
211            )
212        };
213        if n == 0 {
214            return Ok(None);
215        }
216        if n < 0 {
217            let code = errno();
218            if code == libc::EINTR {
219                continue;
220            }
221            return Err(CoreError::sys(code, "read fork child setup error"));
222        }
223        read_len += n as usize;
224        if read_len == msg.len() {
225            let code = i32::from_ne_bytes([msg[0], msg[1], msg[2], msg[3]]);
226            return Ok(Some(CoreError::sys(
227                code,
228                ChildSetupOp::from_u8(msg[4]).as_str(),
229            )));
230        }
231    }
232}
233
234struct Pipes {
235    stdin_r: Option<Fd>,
236    stdin_w: Option<Fd>,
237    stdout_r: Option<Fd>,
238    stdout_w: Option<Fd>,
239    stderr_r: Option<Fd>,
240    stderr_w: Option<Fd>,
241}
242
243impl Pipes {
244    fn new(in_buf: Option<&[u8]>, out: bool, err: bool) -> Result<Self, CoreError> {
245        let (stdin_r, stdin_w) = if in_buf.is_some() {
246            let (r, w) = make_pipe()?;
247            (Some(r), Some(w))
248        } else {
249            (None, None)
250        };
251
252        let (stdout_r, stdout_w) = if out {
253            let (r, w) = make_pipe()?;
254            (Some(r), Some(w))
255        } else {
256            (None, None)
257        };
258
259        let (stderr_r, stderr_w) = if err {
260            let (r, w) = make_pipe()?;
261            (Some(r), Some(w))
262        } else {
263            (None, None)
264        };
265
266        Ok(Self {
267            stdin_r,
268            stdin_w,
269            stdout_r,
270            stdout_w,
271            stderr_r,
272            stderr_w,
273        })
274    }
275
276    #[inline(always)]
277    fn close_all(&mut self) {
278        self.stdin_r.take();
279        self.stdin_w.take();
280        self.stdout_r.take();
281        self.stdout_w.take();
282        self.stderr_r.take();
283        self.stderr_w.take();
284    }
285}
286
287/// Represents the termination status of a process.
288#[derive(Debug, PartialEq, Eq)]
289pub enum ExitStatus {
290    /// Process exited normally with the specified code.
291    Exited(i32),
292    /// Process was terminated by a signal.
293    Signaled(i32),
294}
295
296/// Explicit process spawning backend.
297#[derive(Debug, Clone, Copy, PartialEq, Eq)]
298pub enum SpawnBackend {
299    /// Force the use of `posix_spawn`.
300    PosixSpawn,
301    /// Force the use of `fork`/`exec`.
302    ///
303    /// The fork backend supports explicit [`SpawnFdPolicy`] handling before
304    /// `execve`.
305    Fork,
306}
307
308/// Explicit file-descriptor inheritance policy for spawned children.
309#[derive(Debug, Clone, PartialEq, Eq, Default)]
310pub enum SpawnFdPolicy {
311    /// Inherit descriptors according to their existing `FD_CLOEXEC` flags.
312    #[default]
313    CloexecOnly,
314    /// For the fork backend, close every descriptor >= 3 before `execve`,
315    /// except Core-required pipe descriptors.
316    CloseFrom3,
317    /// For the fork backend, close every descriptor >= 3 before `execve`,
318    /// except Core-required pipe descriptors and the listed descriptors.
319    ///
320    /// Core does not close allowlisted descriptors, but their existing
321    /// `FD_CLOEXEC` state still applies. Callers that want an allowlisted
322    /// descriptor to survive `execve` must clear `FD_CLOEXEC` before spawning.
323    Allowlist(Vec<RawFd>),
324}
325
326/// Owned argument vector storage for spawn internals.
327#[derive(Clone)]
328enum ExecArgv {
329    /// Dynamically allocated C-compatible strings.
330    Dynamic(Vec<CString>),
331}
332
333/// Validated execution context for process spawning.
334#[derive(Clone)]
335struct ExecContext {
336    argv: ExecArgv,
337    envp: Option<Vec<CString>>,
338    cwd: Option<CString>,
339}
340
341impl ExecContext {
342    /// Build a validated execution context for process spawn.
343    fn new(
344        argv: Vec<String>,
345        env: Option<Vec<String>>,
346        cwd: Option<String>,
347    ) -> Result<Self, CoreError> {
348        if argv.is_empty() {
349            return Err(CoreError::sys(libc::EINVAL, "exec argv empty"));
350        }
351
352        let c_argv: Vec<CString> = argv
353            .into_iter()
354            .map(|s| {
355                CString::new(s).map_err(|_| CoreError::sys(libc::EINVAL, "exec argv contains nul"))
356            })
357            .collect::<Result<_, _>>()?;
358
359        let c_envp = match env {
360            Some(vars) => Some(
361                vars.into_iter()
362                    .map(|s| {
363                        CString::new(s)
364                            .map_err(|_| CoreError::sys(libc::EINVAL, "exec env contains nul"))
365                    })
366                    .collect::<Result<Vec<_>, _>>()?,
367            ),
368            None => None,
369        };
370
371        let c_cwd = match cwd {
372            Some(c) => Some(
373                CString::new(c)
374                    .map_err(|_| CoreError::sys(libc::EINVAL, "exec cwd contains nul"))?,
375            ),
376            None => None,
377        };
378
379        Ok(Self {
380            argv: ExecArgv::Dynamic(c_argv),
381            envp: c_envp,
382            cwd: c_cwd,
383        })
384    }
385
386    /// Return a vector of pointers to the argument strings.
387    fn get_argv_ptrs(&self) -> Vec<*mut libc::c_char> {
388        let mut ptrs = Vec::new();
389        match &self.argv {
390            ExecArgv::Dynamic(v) => {
391                for s in v {
392                    ptrs.push(s.as_ptr() as *mut libc::c_char);
393                }
394            }
395        }
396        ptrs.push(ptr::null_mut());
397        ptrs
398    }
399
400    /// Return a vector of pointers to the environment strings.
401    fn get_envp_ptrs(&self) -> Option<Vec<*mut libc::c_char>> {
402        self.envp.as_ref().map(|envp| {
403            let mut ptrs = Vec::new();
404            for s in envp {
405                ptrs.push(s.as_ptr() as *mut libc::c_char);
406            }
407            ptrs.push(ptr::null_mut());
408            ptrs
409        })
410    }
411}
412
413#[inline(always)]
414fn decode_status(status: i32) -> ExitStatus {
415    if WIFEXITED(status) {
416        ExitStatus::Exited(WEXITSTATUS(status))
417    } else if WIFSIGNALED(status) {
418        ExitStatus::Signaled(WTERMSIG(status))
419    } else {
420        ExitStatus::Exited(-1)
421    }
422}
423
424/// A handle to a spawned process.
425pub struct Process {
426    pid: pid_t,
427}
428
429impl Process {
430    /// Create a handle for an existing PID.
431    pub fn new(pid: pid_t) -> Self {
432        Self { pid }
433    }
434
435    /// Return the process ID.
436    pub fn pid(&self) -> pid_t {
437        self.pid
438    }
439
440    /// Perform a non-blocking wait for process termination.
441    pub fn wait_step(&self) -> Result<Option<ExitStatus>, CoreError> {
442        loop {
443            let mut status = 0;
444            let r = unsafe { waitpid(self.pid, &mut status, libc::WNOHANG) };
445            if r == 0 {
446                return Ok(None);
447            }
448            if r < 0 {
449                let e = errno();
450                if e == libc::EINTR {
451                    continue;
452                }
453                return Err(CoreError::sys(e, "waitpid_step"));
454            }
455            return Ok(Some(decode_status(status)));
456        }
457    }
458
459    /// Block until the process terminates.
460    pub fn wait_blocking(&self) -> Result<ExitStatus, CoreError> {
461        loop {
462            let mut status = 0;
463            let r = unsafe { waitpid(self.pid, &mut status, 0) };
464            if r < 0 {
465                let e = errno();
466                if e == libc::EINTR {
467                    continue;
468                }
469                return Err(CoreError::sys(e, "waitpid_blocking"));
470            }
471            return Ok(decode_status(status));
472        }
473    }
474
475    /// Send a signal to the process.
476    pub fn kill(&self, sig: i32) -> Result<(), CoreError> {
477        let r = unsafe { libc::kill(self.pid, sig) };
478        if r < 0 {
479            let e = errno();
480            if e == libc::ESRCH {
481                return Ok(());
482            }
483            syscall_ret(-1, "kill")?;
484        }
485        Ok(())
486    }
487
488    /// Send a signal to the process group.
489    pub fn kill_pgroup(&self, sig: i32) -> Result<(), CoreError> {
490        let r = unsafe { libc::kill(-self.pid, sig) };
491        if r < 0 {
492            let e = errno();
493            if e == libc::ESRCH {
494                return Ok(());
495            }
496            syscall_ret(-1, "kill_pgroup")?;
497        }
498        Ok(())
499    }
500}
501
502/// Configuration options for spawning a new process.
503#[derive(Clone)]
504pub struct SpawnOptions {
505    ctx: ExecContext,
506    stdin: Option<Box<[u8]>>,
507    capture_stdout: bool,
508    capture_stderr: bool,
509    wait: bool,
510    pgroup: ProcessGroup,
511    max_output: usize,
512    timeout_ms: Option<u32>,
513    kill_grace_ms: u32,
514    cancel: CancelPolicy,
515    backend: SpawnBackend,
516    fd_policy: SpawnFdPolicy,
517    early_exit: Option<fn(&[u8]) -> bool>,
518}
519
520impl SpawnOptions {
521    /// Create a new builder for process spawning.
522    pub fn builder(argv: Vec<String>, backend: SpawnBackend) -> SpawnOptionsBuilder {
523        SpawnOptionsBuilder::new(argv, backend)
524    }
525
526    /// Execute the process according to the options and block until completion.
527    pub fn run(self) -> Result<Output, CoreError> {
528        spawn(self)
529    }
530}
531
532/// Builder for [`SpawnOptions`].
533#[derive(Clone)]
534pub struct SpawnOptionsBuilder {
535    argv: Vec<String>,
536    env: Option<Vec<String>>,
537    cwd: Option<String>,
538    stdin: Option<Box<[u8]>>,
539    capture_stdout: bool,
540    capture_stderr: bool,
541    wait: bool,
542    pgroup: ProcessGroup,
543    max_output: usize,
544    timeout_ms: Option<u32>,
545    kill_grace_ms: u32,
546    cancel: CancelPolicy,
547    backend: SpawnBackend,
548    fd_policy: SpawnFdPolicy,
549    early_exit: Option<fn(&[u8]) -> bool>,
550}
551
552impl SpawnOptionsBuilder {
553    /// Create a new builder with the specified argument vector.
554    pub fn new(argv: Vec<String>, backend: SpawnBackend) -> Self {
555        Self {
556            argv,
557            env: None,
558            cwd: None,
559            stdin: None,
560            capture_stdout: false,
561            capture_stderr: false,
562            wait: true,
563            pgroup: ProcessGroup::default(),
564            max_output: 1024 * 1024,
565            timeout_ms: None,
566            kill_grace_ms: 2000,
567            cancel: CancelPolicy::Kill,
568            backend,
569            fd_policy: SpawnFdPolicy::default(),
570            early_exit: None,
571        }
572    }
573
574    /// Set environment variables.
575    pub fn env(mut self, env: Vec<String>) -> Self {
576        self.env = Some(env);
577        self
578    }
579
580    /// Set the working directory.
581    pub fn cwd(mut self, cwd: String) -> Self {
582        self.cwd = Some(cwd);
583        self
584    }
585
586    /// Provide data to be written to the child's stdin.
587    pub fn stdin(mut self, data: impl Into<Box<[u8]>>) -> Self {
588        self.stdin = Some(data.into());
589        self
590    }
591
592    /// Enable stdout capture.
593    pub fn capture_stdout(mut self) -> Self {
594        self.capture_stdout = true;
595        self
596    }
597
598    /// Enable stderr capture.
599    pub fn capture_stderr(mut self) -> Self {
600        self.capture_stderr = true;
601        self
602    }
603
604    /// Set whether to wait for the process to terminate (default: true).
605    pub fn wait(mut self, wait: bool) -> Self {
606        self.wait = wait;
607        self
608    }
609
610    /// Set process group and isolation policy.
611    pub fn pgroup(mut self, pgroup: ProcessGroup) -> Self {
612        self.pgroup = pgroup;
613        self
614    }
615
616    /// Set the combined stdout+stderr output buffer size (default: 1MB).
617    ///
618    /// If captured output exceeds this limit, spawn drains the child pipes to
619    /// completion and returns `EOVERFLOW`.
620    pub fn max_output(mut self, max: usize) -> Self {
621        self.max_output = max;
622        self
623    }
624
625    /// Set the execution timeout in milliseconds.
626    pub fn timeout_ms(mut self, ms: u32) -> Self {
627        self.timeout_ms = Some(ms);
628        self
629    }
630
631    /// Set the grace period before SIGKILL (default: 2s).
632    pub fn kill_grace_ms(mut self, ms: u32) -> Self {
633        self.kill_grace_ms = ms;
634        self
635    }
636
637    /// Set the cancellation policy (default: Kill).
638    pub fn cancel(mut self, policy: CancelPolicy) -> Self {
639        self.cancel = policy;
640        self
641    }
642
643    /// Set the child file-descriptor inheritance policy.
644    pub fn fd_policy(mut self, policy: SpawnFdPolicy) -> Self {
645        self.fd_policy = policy;
646        self
647    }
648
649    /// Set an early exit callback.
650    pub fn early_exit(mut self, callback: fn(&[u8]) -> bool) -> Self {
651        self.early_exit = Some(callback);
652        self
653    }
654
655    /// Build the spawn options.
656    pub fn build(self) -> Result<SpawnOptions, CoreError> {
657        let ctx = ExecContext::new(self.argv, self.env, self.cwd)?;
658        Ok(SpawnOptions {
659            ctx,
660            stdin: self.stdin,
661            capture_stdout: self.capture_stdout,
662            capture_stderr: self.capture_stderr,
663            wait: self.wait,
664            pgroup: self.pgroup,
665            max_output: self.max_output,
666            timeout_ms: self.timeout_ms,
667            kill_grace_ms: self.kill_grace_ms,
668            cancel: self.cancel,
669            backend: self.backend,
670            fd_policy: self.fd_policy,
671            early_exit: self.early_exit,
672        })
673    }
674}
675
676/// The result of a process execution.
677#[derive(Debug)]
678pub struct Output {
679    /// The PID of the finished process.
680    pub pid: pid_t,
681    /// Final exit status (None if `wait=false`).
682    pub status: Option<ExitStatus>,
683    /// Captured stdout buffer.
684    pub stdout: Vec<u8>,
685    /// Captured stderr buffer.
686    pub stderr: Vec<u8>,
687    /// Whether the process timed out.
688    pub timed_out: bool,
689    /// Whether stdout drain stopped because the early-exit callback matched.
690    pub stdout_early_exited: bool,
691}
692
693fn validate_backend(opts: &SpawnOptions) -> Result<(), CoreError> {
694    validate_fd_policy(&opts.fd_policy)?;
695    match opts.backend {
696        SpawnBackend::PosixSpawn => {
697            if opts.ctx.cwd.is_some() {
698                return Err(CoreError::sys(libc::EINVAL, "posix_spawn cwd unsupported"));
699            }
700            if opts.pgroup.isolated {
701                return Err(CoreError::sys(
702                    libc::EINVAL,
703                    "posix_spawn setsid unsupported",
704                ));
705            }
706            if opts.fd_policy != SpawnFdPolicy::CloexecOnly {
707                return Err(CoreError::sys(
708                    libc::EINVAL,
709                    "posix_spawn fd policy unsupported",
710                ));
711            }
712            Ok(())
713        }
714        SpawnBackend::Fork => Ok(()),
715    }
716}
717
718fn validate_fd_policy(policy: &SpawnFdPolicy) -> Result<(), CoreError> {
719    if let SpawnFdPolicy::Allowlist(fds) = policy {
720        let mut seen = Vec::with_capacity(fds.len());
721        for &fd in fds {
722            if fd < 0 {
723                return Err(CoreError::sys(libc::EINVAL, "spawn fd allowlist invalid"));
724            }
725            let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) };
726            if flags < 0 {
727                return Err(CoreError::sys(errno(), "spawn fd allowlist fcntl(F_GETFD)"));
728            }
729            if seen.contains(&fd) {
730                return Err(CoreError::sys(libc::EINVAL, "spawn fd allowlist duplicate"));
731            }
732            seen.push(fd);
733        }
734    }
735    Ok(())
736}
737
738use crate::io::DrainState;
739
740/// Specialized drain state for process spawning.
741pub type SpawnDrain = DrainState<fn(&[u8]) -> bool>;
742
743/// A process that is currently running and being monitored.
744pub struct RunningProcess {
745    /// Handle to the process.
746    pub process: Process,
747    drain: SpawnDrain,
748}
749
750impl RunningProcess {
751    /// Register active stdio pipe descriptors with a reactor.
752    ///
753    /// Call this once after [`spawn_start`] when the process was started with
754    /// captured output or stdin data. The assigned tokens are kept internally
755    /// and later matched by [`Self::handle_reactor_event`].
756    pub fn register_with_reactor(&mut self, reactor: &mut Reactor) -> Result<(), CoreError> {
757        self.drain.register_with_reactor(reactor)
758    }
759
760    /// Apply one reactor readiness event to this process' stdio drain state.
761    ///
762    /// Events for unrelated tokens are ignored. Callers remain responsible for
763    /// waiting on [`Self::process`] and driving the reactor until [`Self::io_done`]
764    /// returns true.
765    pub fn handle_reactor_event(
766        &mut self,
767        reactor: &mut Reactor,
768        event: &crate::reactor::Event,
769    ) -> Result<(), CoreError> {
770        if self.drain.stdout_matches(event.token) {
771            if event.readable {
772                self.drain.handle_stdout_ready(reactor)?;
773            } else if event.error {
774                self.drain.drop_stdout(reactor)?;
775            }
776        } else if self.drain.stderr_matches(event.token) {
777            if event.readable {
778                self.drain.handle_stderr_ready(reactor)?;
779            } else if event.error {
780                self.drain.drop_stderr(reactor)?;
781            }
782        } else if self.drain.stdin_matches(event.token) {
783            if event.writable {
784                self.drain.handle_stdin_writable(reactor)?;
785            } else if event.error {
786                self.drain.drop_stdin(reactor)?;
787            }
788        }
789        Ok(())
790    }
791
792    /// Return whether all managed stdio pipes have been drained or closed.
793    pub fn io_done(&self) -> bool {
794        self.drain.is_done()
795    }
796
797    /// Consume the running process handle and return captured stdout/stderr buffers.
798    pub fn into_output_parts(self) -> (Vec<u8>, Vec<u8>) {
799        self.drain.into_parts()
800    }
801}
802
803use crate::reactor::Reactor;
804
805/// Start spawning a process and return a monitor handle.
806///
807/// This initializes the pipes and starts the process, but does not block. Use
808/// [`RunningProcess::register_with_reactor`],
809/// [`RunningProcess::handle_reactor_event`], [`RunningProcess::io_done`], and
810/// [`RunningProcess::into_output_parts`] to drive captured stdio without
811/// exposing internal drain state.
812///
813/// # Errors
814/// Returns [`CoreError`] if pipe creation, process spawning, or backend selection fails.
815pub fn spawn_start(opts: SpawnOptions) -> Result<RunningProcess, CoreError> {
816    if !opts.wait && (opts.stdin.is_some() || opts.capture_stdout || opts.capture_stderr) {
817        return Err(CoreError::sys(
818            libc::EINVAL,
819            "background I/O capture not supported (wait must be true)",
820        ));
821    }
822
823    validate_backend(&opts)?;
824
825    let (pid, drain) = match opts.backend {
826        SpawnBackend::PosixSpawn => spawn_posix_internal(opts)?,
827        SpawnBackend::Fork => spawn_fork_internal(opts)?,
828    };
829
830    Ok(RunningProcess {
831        process: Process::new(pid),
832        drain,
833    })
834}
835
836/// Spawn a process and block until completion or timeout.
837///
838/// This is the primary high-level interface for process execution. It handles
839/// the full lifecycle, including I/O multiplexing and signal management.
840///
841/// # Errors
842/// Returns [`CoreError`] if any underlying syscall (spawn, pipe, epoll) fails.
843pub fn spawn(opts: SpawnOptions) -> Result<Output, CoreError> {
844    let wait = opts.wait;
845    let timeout_ms = opts.timeout_ms;
846    let kill_grace_ms = opts.kill_grace_ms;
847    let cancel = opts.cancel;
848    let pgroup = opts.pgroup;
849
850    let mut reactor = Reactor::new()?;
851    let running = spawn_start(opts)?;
852
853    let pid = running.process.pid();
854    let mut drain = running.drain;
855
856    drain.register_with_reactor(&mut reactor)?;
857
858    if !wait {
859        let (stdout, stderr) = drain.into_parts();
860        return Ok(Output {
861            pid,
862            status: None,
863            stdout,
864            stderr,
865            timed_out: false,
866            stdout_early_exited: false,
867        });
868    }
869
870    wait_loop(
871        pid,
872        drain,
873        reactor,
874        timeout_ms,
875        kill_grace_ms,
876        cancel,
877        pgroup,
878    )
879}
880
881fn spawn_posix_internal(opts: SpawnOptions) -> Result<(pid_t, SpawnDrain), CoreError> {
882    let mut pipes = Pipes::new(
883        opts.stdin.as_deref(),
884        opts.capture_stdout,
885        opts.capture_stderr,
886    )?;
887
888    let exe_ptr = match &opts.ctx.argv {
889        ExecArgv::Dynamic(v) => v[0].as_ptr(),
890    };
891
892    let argv = opts.ctx.get_argv_ptrs();
893    let envp = opts.ctx.get_envp_ptrs();
894
895    let actions = MaybeUninit::zeroed();
896    let mut actions = unsafe { actions.assume_init() };
897    if let Err(e) = posix_ret(
898        unsafe { posix_spawn_file_actions_init(&mut actions) },
899        "file_actions_init",
900    ) {
901        pipes.close_all();
902        return Err(e);
903    }
904
905    struct Actions(*mut libc::posix_spawn_file_actions_t);
906    impl Drop for Actions {
907        fn drop(&mut self) {
908            unsafe {
909                posix_spawn_file_actions_destroy(self.0);
910            }
911        }
912    }
913    let _guard = Actions(&mut actions);
914
915    if let (Some(r), Some(w)) = (&pipes.stdin_r, &pipes.stdin_w) {
916        if let Err(e) = posix_ret(
917            unsafe { posix_spawn_file_actions_adddup2(&mut actions, r.raw(), 0) },
918            "dup2 stdin",
919        ) {
920            pipes.close_all();
921            return Err(e);
922        }
923        if let Err(e) = posix_ret(
924            unsafe { posix_spawn_file_actions_addclose(&mut actions, r.raw()) },
925            "close stdin pipe",
926        ) {
927            pipes.close_all();
928            return Err(e);
929        }
930        if let Err(e) = posix_ret(
931            unsafe { posix_spawn_file_actions_addclose(&mut actions, w.raw()) },
932            "close stdin write pipe",
933        ) {
934            pipes.close_all();
935            return Err(e);
936        }
937    }
938
939    if let (Some(r), Some(w)) = (&pipes.stdout_r, &pipes.stdout_w) {
940        if let Err(e) = posix_ret(
941            unsafe { posix_spawn_file_actions_adddup2(&mut actions, w.raw(), 1) },
942            "dup2 stdout",
943        ) {
944            pipes.close_all();
945            return Err(e);
946        }
947        if let Err(e) = posix_ret(
948            unsafe { posix_spawn_file_actions_addclose(&mut actions, w.raw()) },
949            "close stdout pipe",
950        ) {
951            pipes.close_all();
952            return Err(e);
953        }
954        if let Err(e) = posix_ret(
955            unsafe { posix_spawn_file_actions_addclose(&mut actions, r.raw()) },
956            "close stdout read pipe",
957        ) {
958            pipes.close_all();
959            return Err(e);
960        }
961    }
962
963    if let (Some(r), Some(w)) = (&pipes.stderr_r, &pipes.stderr_w) {
964        if let Err(e) = posix_ret(
965            unsafe { posix_spawn_file_actions_adddup2(&mut actions, w.raw(), 2) },
966            "dup2 stderr",
967        ) {
968            pipes.close_all();
969            return Err(e);
970        }
971        if let Err(e) = posix_ret(
972            unsafe { posix_spawn_file_actions_addclose(&mut actions, w.raw()) },
973            "close stderr pipe",
974        ) {
975            pipes.close_all();
976            return Err(e);
977        }
978        if let Err(e) = posix_ret(
979            unsafe { posix_spawn_file_actions_addclose(&mut actions, r.raw()) },
980            "close stderr read pipe",
981        ) {
982            pipes.close_all();
983            return Err(e);
984        }
985    }
986
987    let attr = MaybeUninit::zeroed();
988    let mut attr = unsafe { attr.assume_init() };
989    if let Err(e) = posix_ret(unsafe { posix_spawnattr_init(&mut attr) }, "attr_init") {
990        pipes.close_all();
991        return Err(e);
992    }
993
994    struct Attr(*mut libc::posix_spawnattr_t);
995    impl Drop for Attr {
996        fn drop(&mut self) {
997            unsafe {
998                posix_spawnattr_destroy(self.0);
999            }
1000        }
1001    }
1002    let _attr = Attr(&mut attr);
1003
1004    let mut flags = 0;
1005
1006    if let Some(pg) = opts.pgroup.leader {
1007        flags |= POSIX_SPAWN_SETPGROUP;
1008        if let Err(e) = posix_ret(
1009            unsafe { posix_spawnattr_setpgroup(&mut attr, pg) },
1010            "setpgroup",
1011        ) {
1012            pipes.close_all();
1013            return Err(e);
1014        }
1015    }
1016
1017    flags |= POSIX_SPAWN_SETSIGMASK | POSIX_SPAWN_SETSIGDEF;
1018
1019    if let Err(e) = posix_ret(
1020        unsafe { posix_spawnattr_setflags(&mut attr, flags as _) },
1021        "setflags",
1022    ) {
1023        pipes.close_all();
1024        return Err(e);
1025    }
1026
1027    let empty_mask = SignalRuntime::empty_set();
1028    let def = SignalRuntime::set_with(&[libc::SIGPIPE])?;
1029
1030    if let Err(e) = posix_ret(
1031        unsafe { posix_spawnattr_setsigmask(&mut attr, &empty_mask) },
1032        "setsigmask",
1033    ) {
1034        pipes.close_all();
1035        return Err(e);
1036    }
1037    if let Err(e) = posix_ret(
1038        unsafe { posix_spawnattr_setsigdefault(&mut attr, &def) },
1039        "setsigdefault",
1040    ) {
1041        pipes.close_all();
1042        return Err(e);
1043    }
1044
1045    let mut pid: pid_t = 0;
1046
1047    let envp_ptr = envp.as_ref().map_or_else(
1048        || unsafe { environ as *const *mut c_char },
1049        |e: &Vec<*mut c_char>| e.as_ptr(),
1050    );
1051
1052    if let Err(e) = posix_ret(
1053        unsafe { posix_spawn(&mut pid, exe_ptr, &actions, &attr, argv.as_ptr(), envp_ptr) },
1054        "posix_spawn",
1055    ) {
1056        pipes.close_all();
1057        return Err(e);
1058    }
1059
1060    drop(pipes.stdin_r.take());
1061    drop(pipes.stdout_w.take());
1062    drop(pipes.stderr_w.take());
1063
1064    let drain = crate::io::DrainState::new(
1065        pipes.stdin_w.take().filter(|_| opts.stdin.is_some()),
1066        opts.stdin,
1067        pipes.stdout_r.take(),
1068        pipes.stderr_r.take(),
1069        opts.max_output,
1070        opts.early_exit,
1071    )?;
1072
1073    Ok((pid, drain))
1074}
1075
1076fn collect_required_pipe_fds(pipes: &Pipes) -> Vec<RawFd> {
1077    let mut fds = Vec::new();
1078    if let Some(fd) = &pipes.stdin_r {
1079        fds.push(fd.raw());
1080    }
1081    if let Some(fd) = &pipes.stdin_w {
1082        fds.push(fd.raw());
1083    }
1084    if let Some(fd) = &pipes.stdout_r {
1085        fds.push(fd.raw());
1086    }
1087    if let Some(fd) = &pipes.stdout_w {
1088        fds.push(fd.raw());
1089    }
1090    if let Some(fd) = &pipes.stderr_r {
1091        fds.push(fd.raw());
1092    }
1093    if let Some(fd) = &pipes.stderr_w {
1094        fds.push(fd.raw());
1095    }
1096    fds
1097}
1098
1099fn collect_open_fds_for_child_policy(policy: &SpawnFdPolicy) -> Result<Vec<RawFd>, CoreError> {
1100    match policy {
1101        SpawnFdPolicy::CloexecOnly => Ok(Vec::new()),
1102        SpawnFdPolicy::CloseFrom3 | SpawnFdPolicy::Allowlist(_) => {
1103            let dir_fd = unsafe {
1104                libc::open(
1105                    c"/proc/self/fd".as_ptr(),
1106                    libc::O_RDONLY | libc::O_DIRECTORY | libc::O_CLOEXEC,
1107                )
1108            };
1109            if dir_fd < 0 {
1110                return Err(CoreError::sys(errno(), "open /proc/self/fd"));
1111            }
1112
1113            let dir = unsafe { libc::fdopendir(dir_fd) };
1114            if dir.is_null() {
1115                let code = errno();
1116                unsafe {
1117                    libc::close(dir_fd);
1118                }
1119                return Err(CoreError::sys(code, "fdopendir /proc/self/fd"));
1120            }
1121
1122            let mut open_fds = Vec::new();
1123            loop {
1124                let entry = unsafe { libc::readdir(dir) };
1125                if entry.is_null() {
1126                    break;
1127                }
1128                let name = unsafe { std::ffi::CStr::from_ptr((*entry).d_name.as_ptr()) };
1129                if let Ok(s) = name.to_str()
1130                    && let Ok(fd) = s.parse::<RawFd>()
1131                    && fd != dir_fd
1132                {
1133                    open_fds.push(fd);
1134                }
1135            }
1136            unsafe {
1137                libc::closedir(dir);
1138            }
1139            Ok(open_fds)
1140        }
1141    }
1142}
1143
1144fn close_child_fds_for_policy(policy: &SpawnFdPolicy, required_fds: &[RawFd], open_fds: &[RawFd]) {
1145    match policy {
1146        SpawnFdPolicy::CloexecOnly => {}
1147        SpawnFdPolicy::CloseFrom3 | SpawnFdPolicy::Allowlist(_) => {
1148            for &fd in open_fds {
1149                if fd > 2
1150                    && !required_fds.contains(&fd)
1151                    && !matches!(policy, SpawnFdPolicy::Allowlist(allowlist) if allowlist.contains(&fd))
1152                {
1153                    unsafe {
1154                        libc::close(fd);
1155                    }
1156                }
1157            }
1158        }
1159    }
1160}
1161
1162fn spawn_fork_internal(opts: SpawnOptions) -> Result<(pid_t, SpawnDrain), CoreError> {
1163    let mut pipes = Pipes::new(
1164        opts.stdin.as_deref(),
1165        opts.capture_stdout,
1166        opts.capture_stderr,
1167    )?;
1168
1169    let exe_ptr = match &opts.ctx.argv {
1170        ExecArgv::Dynamic(v) => v[0].as_ptr(),
1171    };
1172
1173    let argv = opts.ctx.get_argv_ptrs();
1174    let envp = opts.ctx.get_envp_ptrs();
1175    let cwd_cstr = &opts.ctx.cwd;
1176    let (child_error_r, child_error_w) = make_cloexec_pipe()?;
1177    let mut required_fds = collect_required_pipe_fds(&pipes);
1178    required_fds.push(child_error_w);
1179    let open_fds = collect_open_fds_for_child_policy(&opts.fd_policy)?;
1180
1181    let pid = unsafe { libc::fork() };
1182
1183    if pid < 0 {
1184        unsafe {
1185            libc::close(child_error_r);
1186            libc::close(child_error_w);
1187        }
1188        pipes.close_all();
1189        syscall_ret(-1, "fork")?;
1190    }
1191
1192    if pid == 0 {
1193        // Child
1194        unsafe {
1195            libc::close(child_error_r);
1196        }
1197
1198        // dup stdin
1199        if let (Some(r), Some(_)) = (&pipes.stdin_r, &pipes.stdin_w) {
1200            unsafe {
1201                if libc::dup2(r.raw(), 0) < 0 {
1202                    report_child_setup_error(child_error_w, ChildSetupOp::DupStdin, errno());
1203                }
1204            }
1205        }
1206
1207        // dup stdout
1208        if let (Some(_), Some(w)) = (&pipes.stdout_r, &pipes.stdout_w) {
1209            unsafe {
1210                if libc::dup2(w.raw(), 1) < 0 {
1211                    report_child_setup_error(child_error_w, ChildSetupOp::DupStdout, errno());
1212                }
1213            }
1214        }
1215
1216        // dup stderr
1217        if let (Some(_), Some(w)) = (&pipes.stderr_r, &pipes.stderr_w) {
1218            unsafe {
1219                if libc::dup2(w.raw(), 2) < 0 {
1220                    report_child_setup_error(child_error_w, ChildSetupOp::DupStderr, errno());
1221                }
1222            }
1223        }
1224
1225        // SAFETY: Close all pipe FDs in child before exec, except the ones duped to 0,1,2.
1226        pipes.close_all();
1227
1228        close_child_fds_for_policy(&opts.fd_policy, &required_fds, &open_fds);
1229
1230        // setsid
1231        if opts.pgroup.isolated {
1232            // SAFETY: safe to call setsid in child.
1233            unsafe {
1234                if libc::setsid() < 0 {
1235                    report_child_setup_error(child_error_w, ChildSetupOp::Setsid, errno());
1236                }
1237            }
1238        }
1239
1240        // chdir
1241        if let Some(cwd) = cwd_cstr {
1242            // SAFETY: cwd is a valid null-terminated CString.
1243            unsafe {
1244                if libc::chdir(cwd.as_ptr()) != 0 {
1245                    report_child_setup_error(child_error_w, ChildSetupOp::Chdir, errno());
1246                }
1247            }
1248        }
1249
1250        // setpgid
1251        if let Some(pg) = opts.pgroup.leader {
1252            // SAFETY: valid pgroup.
1253            unsafe {
1254                if libc::setpgid(0, pg) < 0 {
1255                    report_child_setup_error(child_error_w, ChildSetupOp::Setpgid, errno());
1256                }
1257            }
1258        }
1259
1260        let envp_ptr = envp.as_ref().map_or_else(
1261            || unsafe { environ as *const *mut c_char },
1262            |e: &Vec<*mut c_char>| e.as_ptr(),
1263        );
1264
1265        // unblock signals and reset SIGPIPE
1266        // SAFETY: valid signal mask array manipulation
1267        if let Err(err) = SignalRuntime::unblock_all() {
1268            unsafe {
1269                report_child_setup_error(
1270                    child_error_w,
1271                    ChildSetupOp::SignalMask,
1272                    err.raw_os_error().unwrap_or(libc::EIO),
1273                );
1274            }
1275        }
1276        if let Err(err) = SignalRuntime::reset_default(libc::SIGPIPE) {
1277            unsafe {
1278                report_child_setup_error(
1279                    child_error_w,
1280                    ChildSetupOp::SignalMask,
1281                    err.raw_os_error().unwrap_or(libc::EIO),
1282                );
1283            }
1284        }
1285
1286        // exec
1287        // SAFETY: exe_ptr is null-terminated. argv and envp_ptr are valid null-terminated arrays.
1288        unsafe {
1289            libc::execve(
1290                exe_ptr,
1291                argv.as_ptr() as *const *const _,
1292                envp_ptr as *const *const _,
1293            );
1294            report_child_setup_error(child_error_w, ChildSetupOp::Execve, errno());
1295        }
1296    }
1297
1298    // Parent
1299    unsafe {
1300        libc::close(child_error_w);
1301    }
1302    match read_child_setup_error(child_error_r) {
1303        Ok(Some(err)) => {
1304            unsafe {
1305                libc::close(child_error_r);
1306                let mut status = 0;
1307                let _ = libc::waitpid(pid, &mut status, 0);
1308            }
1309            pipes.close_all();
1310            return Err(err);
1311        }
1312        Ok(None) => {}
1313        Err(err) => {
1314            unsafe {
1315                libc::close(child_error_r);
1316            }
1317            pipes.close_all();
1318            return Err(err);
1319        }
1320    }
1321    unsafe {
1322        libc::close(child_error_r);
1323    }
1324    drop(pipes.stdin_r.take());
1325    drop(pipes.stdout_w.take());
1326    drop(pipes.stderr_w.take());
1327
1328    let drain = crate::io::DrainState::new(
1329        pipes.stdin_w.take().filter(|_| opts.stdin.is_some()),
1330        opts.stdin,
1331        pipes.stdout_r.take(),
1332        pipes.stderr_r.take(),
1333        opts.max_output,
1334        opts.early_exit,
1335    )?;
1336
1337    Ok((pid, drain))
1338}
1339
1340enum KillState {
1341    None,
1342    TermSent,
1343    KillSent,
1344}
1345
1346fn wait_loop(
1347    pid: pid_t,
1348    mut drain: crate::io::DrainState<fn(&[u8]) -> bool>,
1349    mut reactor: Reactor,
1350    timeout_ms: Option<u32>,
1351    kill_grace_ms: u32,
1352    cancel: CancelPolicy,
1353    pgroup: ProcessGroup,
1354) -> Result<Output, CoreError> {
1355    let process = Process::new(pid);
1356    let mut status_raw = process.wait_step()?;
1357    let mut state = KillState::None;
1358    let mut timed_out = false;
1359
1360    let start_time = std::time::Instant::now();
1361    let deadline = timeout_ms.map(|t| std::time::Duration::from_millis(t as u64));
1362
1363    loop {
1364        let mut poll_timeout = -1;
1365
1366        if let Some(dl) = deadline {
1367            let elapsed = start_time.elapsed();
1368            if elapsed >= dl {
1369                timed_out = true;
1370                let elapsed_over = (elapsed - dl).as_millis();
1371
1372                let target_is_group = pgroup.isolated || pgroup.leader.is_some();
1373
1374                match state {
1375                    KillState::None => {
1376                        if cancel == CancelPolicy::Graceful {
1377                            let r = if target_is_group {
1378                                process.kill_pgroup(libc::SIGTERM)
1379                            } else {
1380                                process.kill(libc::SIGTERM)
1381                            };
1382                            if r.is_err() {
1383                                state = KillState::KillSent; // Process already gone
1384                            } else {
1385                                state = KillState::TermSent;
1386                            }
1387                        } else if cancel == CancelPolicy::Kill {
1388                            let _ = if target_is_group {
1389                                process.kill_pgroup(libc::SIGKILL)
1390                            } else {
1391                                process.kill(libc::SIGKILL)
1392                            };
1393                            state = KillState::KillSent;
1394                        } else {
1395                            // CancelPolicy::None just times out without killing
1396                        }
1397                    }
1398                    KillState::TermSent if elapsed_over > kill_grace_ms as u128 => {
1399                        let _ = if target_is_group {
1400                            process.kill_pgroup(libc::SIGKILL)
1401                        } else {
1402                            process.kill(libc::SIGKILL)
1403                        };
1404                        state = KillState::KillSent;
1405                    }
1406                    _ => {}
1407                }
1408                poll_timeout = 100; // Poll frequently while waiting for kill to take effect
1409            } else {
1410                let remaining = dl - elapsed;
1411                poll_timeout = remaining.as_millis().min(i32::MAX as u128) as i32;
1412            }
1413        }
1414
1415        if status_raw.is_none()
1416            && let Some(s) = process.wait_step()?
1417        {
1418            status_raw = Some(s);
1419        }
1420
1421        if drain.is_done() {
1422            let s = match status_raw {
1423                Some(s) => s,
1424                None => process.wait_blocking()?,
1425            };
1426
1427            for slot in drain.take_all_slots() {
1428                reactor.del(&slot.fd)?;
1429            }
1430            let (stdout, stderr, output_limit_exceeded, stdout_early_exited) =
1431                drain.into_parts_with_state();
1432            if output_limit_exceeded {
1433                return Err(CoreError::sys(libc::EOVERFLOW, "spawn output limit"));
1434            }
1435            return Ok(Output {
1436                pid,
1437                status: Some(s),
1438                stdout,
1439                stderr,
1440                timed_out,
1441                stdout_early_exited,
1442            });
1443        }
1444
1445        let timeout = if status_raw.is_some() {
1446            if poll_timeout == -1 || poll_timeout > 1 {
1447                1
1448            } else {
1449                poll_timeout
1450            }
1451        } else {
1452            poll_timeout
1453        };
1454
1455        let mut events = Vec::new();
1456        let nevents = reactor.wait(&mut events, 64, timeout)?;
1457
1458        for ev in events.iter().take(nevents) {
1459            if drain.stdout_matches(ev.token) {
1460                if ev.readable {
1461                    drain.handle_stdout_ready(&mut reactor)?;
1462                } else if ev.error {
1463                    drain.drop_stdout(&mut reactor)?;
1464                }
1465            } else if drain.stderr_matches(ev.token) {
1466                if ev.readable {
1467                    drain.handle_stderr_ready(&mut reactor)?;
1468                } else if ev.error {
1469                    drain.drop_stderr(&mut reactor)?;
1470                }
1471            } else if drain.stdin_matches(ev.token) {
1472                if ev.writable {
1473                    drain.handle_stdin_writable(&mut reactor)?;
1474                } else if ev.error {
1475                    drain.drop_stdin(&mut reactor)?;
1476                }
1477            }
1478        }
1479    }
1480}