coreshift_core/io/
drain.rs1use crate::CoreError;
16use crate::io::buffer::{BufferState, ReadState};
17use crate::io::writer::WriterState;
18use crate::reactor::{Fd, Token};
19
20pub(crate) struct FdSlot {
22 pub token: Option<Token>,
24 pub fd: Fd,
26}
27
28#[repr(align(64))]
50pub struct DrainState<F>
51where
52 F: FnMut(&[u8]) -> bool,
53{
54 pub(crate) stdout_slot: Option<FdSlot>,
55 pub(crate) stderr_slot: Option<FdSlot>,
56 pub(crate) stdin_slot: Option<FdSlot>,
57
58 pub(crate) buffer: BufferState,
59 pub(crate) writer: WriterState,
60
61 pub(crate) early_exit: Option<F>,
62}
63
64impl<F> DrainState<F>
65where
66 F: FnMut(&[u8]) -> bool,
67{
68 pub fn new(
75 stdin_fd: Option<Fd>,
76 stdin_buf: Option<Box<[u8]>>,
77 stdout_fd: Option<Fd>,
78 stderr_fd: Option<Fd>,
79 limit: usize,
80 early_exit: Option<F>,
81 ) -> Result<Self, CoreError> {
82 let stdin_slot = if stdin_buf.is_some() {
83 if let Some(fd) = stdin_fd {
84 fd.set_nonblock()?;
85 Some(FdSlot { token: None, fd })
86 } else {
87 None
88 }
89 } else {
90 None
91 };
92
93 let stdout_slot = if let Some(fd) = stdout_fd {
94 fd.set_nonblock()?;
95 Some(FdSlot { token: None, fd })
96 } else {
97 None
98 };
99
100 let stderr_slot = if let Some(fd) = stderr_fd {
101 fd.set_nonblock()?;
102 Some(FdSlot { token: None, fd })
103 } else {
104 None
105 };
106
107 Ok(Self {
108 stdin_slot,
109 stdout_slot,
110 stderr_slot,
111 buffer: BufferState::new(limit),
112 writer: WriterState::new(stdin_buf),
113 early_exit,
114 })
115 }
116
117 #[inline(always)]
119 pub fn is_done(&self) -> bool {
120 self.stdin_slot.is_none() && self.stdout_slot.is_none() && self.stderr_slot.is_none()
121 }
122
123 #[inline(always)]
132 pub fn write_stdin(&mut self) -> Result<bool, CoreError> {
133 let fd = if let Some(s) = &self.stdin_slot {
134 &s.fd
135 } else {
136 return Ok(true);
137 };
138
139 let done = self.writer.write_to_fd(fd)?;
140 if done {
141 self.stdin_slot.take();
142 return Ok(true);
143 }
144 Ok(false)
145 }
146
147 #[inline(always)]
156 pub fn read_fd(&mut self, is_stdout: bool) -> Result<bool, CoreError> {
157 let read_state = {
158 let slot = if is_stdout {
159 &self.stdout_slot
160 } else {
161 &self.stderr_slot
162 };
163 let fd = if let Some(s) = slot {
164 &s.fd
165 } else {
166 return Ok(true);
167 };
168 self.buffer
169 .read_from_fd(fd, is_stdout, &mut self.early_exit)?
170 };
171
172 if read_state != ReadState::Open {
173 if is_stdout {
174 self.stdout_slot.take();
175 return Ok(true);
176 } else {
177 self.stderr_slot.take();
178 return Ok(true);
179 }
180 }
181
182 Ok(false)
183 }
184
185 pub(crate) fn take_all_slots(&mut self) -> Vec<FdSlot> {
187 let mut slots = Vec::new();
188 if let Some(slot) = self.stdin_slot.take() {
189 slots.push(slot);
190 }
191 if let Some(slot) = self.stdout_slot.take() {
192 slots.push(slot);
193 }
194 if let Some(slot) = self.stderr_slot.take() {
195 slots.push(slot);
196 }
197 slots
198 }
199
200 pub(crate) fn register_with_reactor(
201 &mut self,
202 reactor: &mut crate::reactor::Reactor,
203 ) -> Result<(), CoreError> {
204 if let Some(mut slot) = self.stdin_slot.take() {
205 slot.token = Some(reactor.add(&slot.fd, false, true)?);
206 self.stdin_slot = Some(slot);
207 }
208 if let Some(mut slot) = self.stdout_slot.take() {
209 slot.token = Some(reactor.add(&slot.fd, true, false)?);
210 self.stdout_slot = Some(slot);
211 }
212 if let Some(mut slot) = self.stderr_slot.take() {
213 slot.token = Some(reactor.add(&slot.fd, true, false)?);
214 self.stderr_slot = Some(slot);
215 }
216 Ok(())
217 }
218
219 pub(crate) fn stdout_matches(&self, token: Token) -> bool {
220 self.stdout_slot
221 .as_ref()
222 .is_some_and(|slot| slot.token == Some(token))
223 }
224
225 pub(crate) fn stderr_matches(&self, token: Token) -> bool {
226 self.stderr_slot
227 .as_ref()
228 .is_some_and(|slot| slot.token == Some(token))
229 }
230
231 pub(crate) fn stdin_matches(&self, token: Token) -> bool {
232 self.stdin_slot
233 .as_ref()
234 .is_some_and(|slot| slot.token == Some(token))
235 }
236
237 pub(crate) fn drop_stdout(
238 &mut self,
239 reactor: &mut crate::reactor::Reactor,
240 ) -> Result<(), CoreError> {
241 if let Some(slot) = self.stdout_slot.take() {
242 reactor.del(&slot.fd)?;
243 }
244 Ok(())
245 }
246
247 pub(crate) fn drop_stderr(
248 &mut self,
249 reactor: &mut crate::reactor::Reactor,
250 ) -> Result<(), CoreError> {
251 if let Some(slot) = self.stderr_slot.take() {
252 reactor.del(&slot.fd)?;
253 }
254 Ok(())
255 }
256
257 pub(crate) fn drop_stdin(
258 &mut self,
259 reactor: &mut crate::reactor::Reactor,
260 ) -> Result<(), CoreError> {
261 if let Some(slot) = self.stdin_slot.take() {
262 reactor.del(&slot.fd)?;
263 }
264 self.writer.buf = None;
265 Ok(())
266 }
267
268 pub(crate) fn handle_stdout_ready(
269 &mut self,
270 reactor: &mut crate::reactor::Reactor,
271 ) -> Result<(), CoreError> {
272 if let Some(slot) = &self.stdout_slot {
273 let read_state = self
274 .buffer
275 .read_from_fd(&slot.fd, true, &mut self.early_exit)?;
276 if read_state != ReadState::Open {
277 self.drop_stdout(reactor)?;
278 }
279 }
280 Ok(())
281 }
282
283 pub(crate) fn handle_stderr_ready(
284 &mut self,
285 reactor: &mut crate::reactor::Reactor,
286 ) -> Result<(), CoreError> {
287 if let Some(slot) = &self.stderr_slot {
288 let read_state = self
289 .buffer
290 .read_from_fd(&slot.fd, false, &mut self.early_exit)?;
291 if read_state != ReadState::Open {
292 self.drop_stderr(reactor)?;
293 }
294 }
295 Ok(())
296 }
297
298 pub(crate) fn handle_stdin_writable(
299 &mut self,
300 reactor: &mut crate::reactor::Reactor,
301 ) -> Result<(), CoreError> {
302 if let Some(slot) = &self.stdin_slot {
303 let done = self.writer.write_to_fd(&slot.fd)?;
304 if done {
305 self.drop_stdin(reactor)?;
306 }
307 }
308 Ok(())
309 }
310
311 pub fn into_parts(mut self) -> (Vec<u8>, Vec<u8>) {
313 let (stdout, stderr, _, _) = std::mem::take(&mut self.buffer).into_parts();
314 (stdout, stderr)
315 }
316
317 #[inline(always)]
319 pub fn output_limit_exceeded(&self) -> bool {
320 self.buffer.output_limit_exceeded()
321 }
322
323 #[inline(always)]
325 pub fn stdout_early_exited(&self) -> bool {
326 self.buffer.stdout_early_exited()
327 }
328
329 pub(crate) fn into_parts_with_state(mut self) -> (Vec<u8>, Vec<u8>, bool, bool) {
331 std::mem::take(&mut self.buffer).into_parts()
332 }
333}