Skip to main content

coreshift_core/io/
drain.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/
4
5//! High-level process I/O management.
6//!
7//! This module provides the [`DrainState`] structure, which coordinates the
8//! simultaneous reading from process output pipes and writing to process
9//! input pipes.
10//!
11//! This is an advanced helper for callers that already own child-process file
12//! descriptors and want non-blocking drain semantics without reimplementing
13//! the bookkeeping.
14
15use crate::CoreError;
16use crate::io::buffer::{BufferState, ReadState};
17use crate::io::writer::WriterState;
18use crate::reactor::{Fd, Token};
19
20/// Associates a file descriptor with an optional reactor token.
21pub(crate) struct FdSlot {
22    /// Token assigned by the reactor for this descriptor.
23    pub token: Option<Token>,
24    /// The managed file descriptor.
25    pub fd: Fd,
26}
27
28/// Orchestrates non-blocking process I/O.
29///
30/// `DrainState` tracks the state of stdin, stdout, and stderr pipes for a
31/// single process. It handles the multiplexing of data between these pipes
32/// and internal buffers.
33///
34/// # Example
35/// ```no_run
36/// # use coreshift_core::io::DrainState;
37/// # use coreshift_core::reactor::Reactor;
38/// # fn example(mut drain: DrainState<fn(&[u8]) -> bool>, mut reactor: Reactor) -> Result<(), Box<dyn std::error::Error>> {
39/// while !drain.is_done() {
40///     let mut events = Vec::new();
41///     reactor.wait(&mut events, 64, -1)?;
42///     for ev in events {
43///         // Map event tokens to drain calls...
44///     }
45/// }
46/// # Ok(())
47/// # }
48/// ```
49#[repr(align(64))]
50pub struct DrainState<F>
51where
52    F: FnMut(&[u8]) -> bool,
53{
54    pub(crate) stdout_slot: Option<FdSlot>,
55    pub(crate) stderr_slot: Option<FdSlot>,
56    pub(crate) stdin_slot: Option<FdSlot>,
57
58    pub(crate) buffer: BufferState,
59    pub(crate) writer: WriterState,
60
61    pub(crate) early_exit: Option<F>,
62}
63
64impl<F> DrainState<F>
65where
66    F: FnMut(&[u8]) -> bool,
67{
68    /// Initialize a new drain state for the provided descriptors.
69    ///
70    /// This consumes the descriptors and sets them to non-blocking mode.
71    ///
72    /// ### Errors
73    /// - `EBADF`: One of the provided file descriptors is invalid.
74    pub fn new(
75        stdin_fd: Option<Fd>,
76        stdin_buf: Option<Box<[u8]>>,
77        stdout_fd: Option<Fd>,
78        stderr_fd: Option<Fd>,
79        limit: usize,
80        early_exit: Option<F>,
81    ) -> Result<Self, CoreError> {
82        let stdin_slot = if stdin_buf.is_some() {
83            if let Some(fd) = stdin_fd {
84                fd.set_nonblock()?;
85                Some(FdSlot { token: None, fd })
86            } else {
87                None
88            }
89        } else {
90            None
91        };
92
93        let stdout_slot = if let Some(fd) = stdout_fd {
94            fd.set_nonblock()?;
95            Some(FdSlot { token: None, fd })
96        } else {
97            None
98        };
99
100        let stderr_slot = if let Some(fd) = stderr_fd {
101            fd.set_nonblock()?;
102            Some(FdSlot { token: None, fd })
103        } else {
104            None
105        };
106
107        Ok(Self {
108            stdin_slot,
109            stdout_slot,
110            stderr_slot,
111            buffer: BufferState::new(limit),
112            writer: WriterState::new(stdin_buf),
113            early_exit,
114        })
115    }
116
117    /// Returns `true` if all pipes have been closed or fully drained.
118    #[inline(always)]
119    pub fn is_done(&self) -> bool {
120        self.stdin_slot.is_none() && self.stdout_slot.is_none() && self.stderr_slot.is_none()
121    }
122
123    /// Perform a non-blocking write to stdin if pending.
124    ///
125    /// Returns `Ok(true)` if the write buffer is empty or the descriptor is
126    /// closed.
127    ///
128    /// ### Errors
129    /// - `EPIPE`: The child process closed its reading end of the pipe.
130    /// - `EIO`: Low-level I/O error.
131    #[inline(always)]
132    pub fn write_stdin(&mut self) -> Result<bool, CoreError> {
133        let fd = if let Some(s) = &self.stdin_slot {
134            &s.fd
135        } else {
136            return Ok(true);
137        };
138
139        let done = self.writer.write_to_fd(fd)?;
140        if done {
141            self.stdin_slot.take();
142            return Ok(true);
143        }
144        Ok(false)
145    }
146
147    /// Perform a non-blocking read from stdout or stderr.
148    ///
149    /// Returns `Ok(true)` if the stream reached EOF or the early-exit condition
150    /// was met.
151    ///
152    /// ### Errors
153    /// - `EOVERFLOW`: The captured output exceeded the specified limit.
154    /// - `EIO`: Low-level I/O error.
155    #[inline(always)]
156    pub fn read_fd(&mut self, is_stdout: bool) -> Result<bool, CoreError> {
157        let read_state = {
158            let slot = if is_stdout {
159                &self.stdout_slot
160            } else {
161                &self.stderr_slot
162            };
163            let fd = if let Some(s) = slot {
164                &s.fd
165            } else {
166                return Ok(true);
167            };
168            self.buffer
169                .read_from_fd(fd, is_stdout, &mut self.early_exit)?
170        };
171
172        if read_state != ReadState::Open {
173            if is_stdout {
174                self.stdout_slot.take();
175                return Ok(true);
176            } else {
177                self.stderr_slot.take();
178                return Ok(true);
179            }
180        }
181
182        Ok(false)
183    }
184
185    /// Extract all active slots for cleanup or reactor removal.
186    pub(crate) fn take_all_slots(&mut self) -> Vec<FdSlot> {
187        let mut slots = Vec::new();
188        if let Some(slot) = self.stdin_slot.take() {
189            slots.push(slot);
190        }
191        if let Some(slot) = self.stdout_slot.take() {
192            slots.push(slot);
193        }
194        if let Some(slot) = self.stderr_slot.take() {
195            slots.push(slot);
196        }
197        slots
198    }
199
200    pub(crate) fn register_with_reactor(
201        &mut self,
202        reactor: &mut crate::reactor::Reactor,
203    ) -> Result<(), CoreError> {
204        if let Some(mut slot) = self.stdin_slot.take() {
205            slot.token = Some(reactor.add(&slot.fd, false, true)?);
206            self.stdin_slot = Some(slot);
207        }
208        if let Some(mut slot) = self.stdout_slot.take() {
209            slot.token = Some(reactor.add(&slot.fd, true, false)?);
210            self.stdout_slot = Some(slot);
211        }
212        if let Some(mut slot) = self.stderr_slot.take() {
213            slot.token = Some(reactor.add(&slot.fd, true, false)?);
214            self.stderr_slot = Some(slot);
215        }
216        Ok(())
217    }
218
219    pub(crate) fn stdout_matches(&self, token: Token) -> bool {
220        self.stdout_slot
221            .as_ref()
222            .is_some_and(|slot| slot.token == Some(token))
223    }
224
225    pub(crate) fn stderr_matches(&self, token: Token) -> bool {
226        self.stderr_slot
227            .as_ref()
228            .is_some_and(|slot| slot.token == Some(token))
229    }
230
231    pub(crate) fn stdin_matches(&self, token: Token) -> bool {
232        self.stdin_slot
233            .as_ref()
234            .is_some_and(|slot| slot.token == Some(token))
235    }
236
237    pub(crate) fn drop_stdout(
238        &mut self,
239        reactor: &mut crate::reactor::Reactor,
240    ) -> Result<(), CoreError> {
241        if let Some(slot) = self.stdout_slot.take() {
242            reactor.del(&slot.fd)?;
243        }
244        Ok(())
245    }
246
247    pub(crate) fn drop_stderr(
248        &mut self,
249        reactor: &mut crate::reactor::Reactor,
250    ) -> Result<(), CoreError> {
251        if let Some(slot) = self.stderr_slot.take() {
252            reactor.del(&slot.fd)?;
253        }
254        Ok(())
255    }
256
257    pub(crate) fn drop_stdin(
258        &mut self,
259        reactor: &mut crate::reactor::Reactor,
260    ) -> Result<(), CoreError> {
261        if let Some(slot) = self.stdin_slot.take() {
262            reactor.del(&slot.fd)?;
263        }
264        self.writer.buf = None;
265        Ok(())
266    }
267
268    pub(crate) fn handle_stdout_ready(
269        &mut self,
270        reactor: &mut crate::reactor::Reactor,
271    ) -> Result<(), CoreError> {
272        if let Some(slot) = &self.stdout_slot {
273            let read_state = self
274                .buffer
275                .read_from_fd(&slot.fd, true, &mut self.early_exit)?;
276            if read_state != ReadState::Open {
277                self.drop_stdout(reactor)?;
278            }
279        }
280        Ok(())
281    }
282
283    pub(crate) fn handle_stderr_ready(
284        &mut self,
285        reactor: &mut crate::reactor::Reactor,
286    ) -> Result<(), CoreError> {
287        if let Some(slot) = &self.stderr_slot {
288            let read_state = self
289                .buffer
290                .read_from_fd(&slot.fd, false, &mut self.early_exit)?;
291            if read_state != ReadState::Open {
292                self.drop_stderr(reactor)?;
293            }
294        }
295        Ok(())
296    }
297
298    pub(crate) fn handle_stdin_writable(
299        &mut self,
300        reactor: &mut crate::reactor::Reactor,
301    ) -> Result<(), CoreError> {
302        if let Some(slot) = &self.stdin_slot {
303            let done = self.writer.write_to_fd(&slot.fd)?;
304            if done {
305                self.drop_stdin(reactor)?;
306            }
307        }
308        Ok(())
309    }
310
311    /// Consume the state and return (stdout, stderr) buffers.
312    pub fn into_parts(mut self) -> (Vec<u8>, Vec<u8>) {
313        let (stdout, stderr, _, _) = std::mem::take(&mut self.buffer).into_parts();
314        (stdout, stderr)
315    }
316
317    /// Return whether the combined stdout+stderr output limit was exceeded.
318    #[inline(always)]
319    pub fn output_limit_exceeded(&self) -> bool {
320        self.buffer.output_limit_exceeded()
321    }
322
323    /// Return whether stdout was explicitly stopped by the early-exit predicate.
324    #[inline(always)]
325    pub fn stdout_early_exited(&self) -> bool {
326        self.buffer.stdout_early_exited()
327    }
328
329    /// Consume the state and return buffers plus drain flags.
330    pub(crate) fn into_parts_with_state(mut self) -> (Vec<u8>, Vec<u8>, bool, bool) {
331        std::mem::take(&mut self.buffer).into_parts()
332    }
333}