Skip to main content

coreshift_core/io/
drain.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/
4
5//! High-level process I/O management.
6//!
7//! This module provides the [`DrainState`] structure, which coordinates the
8//! simultaneous reading from process output pipes and writing to process
9//! input pipes.
10//!
11//! This is an advanced helper for callers that already own child-process file
12//! descriptors and want non-blocking drain semantics without reimplementing
13//! the bookkeeping.
14
15use crate::CoreError;
16use crate::io::buffer::{BufferState, ReadState};
17use crate::io::writer::WriterState;
18use crate::reactor::{Fd, Token};
19
20/// Associates a file descriptor with an optional reactor token.
21pub(crate) struct FdSlot {
22    /// Token assigned by the reactor for this descriptor.
23    pub token: Option<Token>,
24    /// The managed file descriptor.
25    pub fd: Fd,
26}
27
28/// Orchestrates non-blocking process I/O.
29///
30/// `DrainState` tracks the state of stdin, stdout, and stderr pipes for a
31/// single process. It handles the multiplexing of data between these pipes
32/// and internal buffers.
33///
34/// # Example
35/// ```no_run
36/// # use coreshift_core::io::DrainState;
37/// # use coreshift_core::reactor::Reactor;
38/// # fn example(mut drain: DrainState<fn(&[u8]) -> bool>, mut reactor: Reactor) -> Result<(), Box<dyn std::error::Error>> {
39/// while !drain.is_done() {
40///     let mut events = Vec::new();
41///     reactor.wait(&mut events, 64, -1)?;
42///     for ev in events {
43///         // Map event tokens to drain calls...
44///     }
45/// }
46/// # Ok(())
47/// # }
48/// ```
49#[repr(align(64))]
50pub struct DrainState<F>
51where
52    F: FnMut(&[u8]) -> bool,
53{
54    pub(crate) stdout_slot: Option<FdSlot>,
55    pub(crate) stderr_slot: Option<FdSlot>,
56    pub(crate) stdin_slot: Option<FdSlot>,
57
58    pub(crate) buffer: BufferState,
59    pub(crate) writer: WriterState,
60
61    pub(crate) early_exit: Option<F>,
62}
63
64impl<F> DrainState<F>
65where
66    F: FnMut(&[u8]) -> bool,
67{
68    /// Initialize a new drain state for the provided descriptors.
69    ///
70    /// This consumes the descriptors and sets them to non-blocking mode.
71    pub fn new(
72        stdin_fd: Option<Fd>,
73        stdin_buf: Option<Box<[u8]>>,
74        stdout_fd: Option<Fd>,
75        stderr_fd: Option<Fd>,
76        limit: usize,
77        early_exit: Option<F>,
78    ) -> Result<Self, CoreError> {
79        let stdin_slot = if stdin_buf.is_some() {
80            if let Some(fd) = stdin_fd {
81                fd.set_nonblock()?;
82                Some(FdSlot { token: None, fd })
83            } else {
84                None
85            }
86        } else {
87            None
88        };
89
90        let stdout_slot = if let Some(fd) = stdout_fd {
91            fd.set_nonblock()?;
92            Some(FdSlot { token: None, fd })
93        } else {
94            None
95        };
96
97        let stderr_slot = if let Some(fd) = stderr_fd {
98            fd.set_nonblock()?;
99            Some(FdSlot { token: None, fd })
100        } else {
101            None
102        };
103
104        Ok(Self {
105            stdin_slot,
106            stdout_slot,
107            stderr_slot,
108            buffer: BufferState::new(limit),
109            writer: WriterState::new(stdin_buf),
110            early_exit,
111        })
112    }
113
114    /// Returns `true` if all pipes have been closed or fully drained.
115    #[inline(always)]
116    pub fn is_done(&self) -> bool {
117        self.stdin_slot.is_none() && self.stdout_slot.is_none() && self.stderr_slot.is_none()
118    }
119
120    /// Perform a non-blocking write to stdin if pending.
121    #[inline(always)]
122    pub fn write_stdin(&mut self) -> Result<bool, CoreError> {
123        let fd = if let Some(s) = &self.stdin_slot {
124            &s.fd
125        } else {
126            return Ok(true);
127        };
128
129        let done = self.writer.write_to_fd(fd)?;
130        if done {
131            self.stdin_slot.take();
132            return Ok(true);
133        }
134        Ok(false)
135    }
136
137    /// Perform a non-blocking read from stdout or stderr.
138    #[inline(always)]
139    pub fn read_fd(&mut self, is_stdout: bool) -> Result<bool, CoreError> {
140        let read_state = {
141            let slot = if is_stdout {
142                &self.stdout_slot
143            } else {
144                &self.stderr_slot
145            };
146            let fd = if let Some(s) = slot {
147                &s.fd
148            } else {
149                return Ok(true);
150            };
151            self.buffer
152                .read_from_fd(fd, is_stdout, &mut self.early_exit)?
153        };
154
155        if read_state != ReadState::Open {
156            if is_stdout {
157                self.stdout_slot.take();
158                return Ok(true);
159            } else {
160                self.stderr_slot.take();
161                return Ok(true);
162            }
163        }
164
165        Ok(false)
166    }
167
168    /// Extract all active slots for cleanup or reactor removal.
169    pub(crate) fn take_all_slots(&mut self) -> Vec<FdSlot> {
170        let mut slots = Vec::new();
171        if let Some(slot) = self.stdin_slot.take() {
172            slots.push(slot);
173        }
174        if let Some(slot) = self.stdout_slot.take() {
175            slots.push(slot);
176        }
177        if let Some(slot) = self.stderr_slot.take() {
178            slots.push(slot);
179        }
180        slots
181    }
182
183    pub(crate) fn register_with_reactor(
184        &mut self,
185        reactor: &mut crate::reactor::Reactor,
186    ) -> Result<(), CoreError> {
187        if let Some(mut slot) = self.stdin_slot.take() {
188            slot.token = Some(reactor.add(&slot.fd, false, true)?);
189            self.stdin_slot = Some(slot);
190        }
191        if let Some(mut slot) = self.stdout_slot.take() {
192            slot.token = Some(reactor.add(&slot.fd, true, false)?);
193            self.stdout_slot = Some(slot);
194        }
195        if let Some(mut slot) = self.stderr_slot.take() {
196            slot.token = Some(reactor.add(&slot.fd, true, false)?);
197            self.stderr_slot = Some(slot);
198        }
199        Ok(())
200    }
201
202    pub(crate) fn stdout_matches(&self, token: Token) -> bool {
203        self.stdout_slot
204            .as_ref()
205            .is_some_and(|slot| slot.token == Some(token))
206    }
207
208    pub(crate) fn stderr_matches(&self, token: Token) -> bool {
209        self.stderr_slot
210            .as_ref()
211            .is_some_and(|slot| slot.token == Some(token))
212    }
213
214    pub(crate) fn stdin_matches(&self, token: Token) -> bool {
215        self.stdin_slot
216            .as_ref()
217            .is_some_and(|slot| slot.token == Some(token))
218    }
219
220    pub(crate) fn drop_stdout(
221        &mut self,
222        reactor: &mut crate::reactor::Reactor,
223    ) -> Result<(), CoreError> {
224        if let Some(slot) = self.stdout_slot.take() {
225            reactor.del(&slot.fd)?;
226        }
227        Ok(())
228    }
229
230    pub(crate) fn drop_stderr(
231        &mut self,
232        reactor: &mut crate::reactor::Reactor,
233    ) -> Result<(), CoreError> {
234        if let Some(slot) = self.stderr_slot.take() {
235            reactor.del(&slot.fd)?;
236        }
237        Ok(())
238    }
239
240    pub(crate) fn drop_stdin(
241        &mut self,
242        reactor: &mut crate::reactor::Reactor,
243    ) -> Result<(), CoreError> {
244        if let Some(slot) = self.stdin_slot.take() {
245            reactor.del(&slot.fd)?;
246        }
247        self.writer.buf = None;
248        Ok(())
249    }
250
251    pub(crate) fn handle_stdout_ready(
252        &mut self,
253        reactor: &mut crate::reactor::Reactor,
254    ) -> Result<(), CoreError> {
255        if let Some(slot) = &self.stdout_slot {
256            let read_state = self
257                .buffer
258                .read_from_fd(&slot.fd, true, &mut self.early_exit)?;
259            if read_state != ReadState::Open {
260                self.drop_stdout(reactor)?;
261            }
262        }
263        Ok(())
264    }
265
266    pub(crate) fn handle_stderr_ready(
267        &mut self,
268        reactor: &mut crate::reactor::Reactor,
269    ) -> Result<(), CoreError> {
270        if let Some(slot) = &self.stderr_slot {
271            let read_state = self
272                .buffer
273                .read_from_fd(&slot.fd, false, &mut self.early_exit)?;
274            if read_state != ReadState::Open {
275                self.drop_stderr(reactor)?;
276            }
277        }
278        Ok(())
279    }
280
281    pub(crate) fn handle_stdin_writable(
282        &mut self,
283        reactor: &mut crate::reactor::Reactor,
284    ) -> Result<(), CoreError> {
285        if let Some(slot) = &self.stdin_slot {
286            let done = self.writer.write_to_fd(&slot.fd)?;
287            if done {
288                self.drop_stdin(reactor)?;
289            }
290        }
291        Ok(())
292    }
293
294    /// Consume the state and return (stdout, stderr) buffers.
295    pub fn into_parts(mut self) -> (Vec<u8>, Vec<u8>) {
296        let (stdout, stderr, _, _) = std::mem::take(&mut self.buffer).into_parts();
297        (stdout, stderr)
298    }
299
300    /// Return whether the combined stdout+stderr output limit was exceeded.
301    #[inline(always)]
302    pub fn output_limit_exceeded(&self) -> bool {
303        self.buffer.output_limit_exceeded()
304    }
305
306    /// Return whether stdout was explicitly stopped by the early-exit predicate.
307    #[inline(always)]
308    pub fn stdout_early_exited(&self) -> bool {
309        self.buffer.stdout_early_exited()
310    }
311
312    /// Consume the state and return buffers plus drain flags.
313    pub(crate) fn into_parts_with_state(mut self) -> (Vec<u8>, Vec<u8>, bool, bool) {
314        std::mem::take(&mut self.buffer).into_parts()
315    }
316}