coreshift_core/io/
drain.rs1use crate::CoreError;
16use crate::io::buffer::{BufferState, ReadState};
17use crate::io::writer::WriterState;
18use crate::reactor::{Fd, Token};
19
20pub(crate) struct FdSlot {
22 pub token: Option<Token>,
24 pub fd: Fd,
26}
27
28#[repr(align(64))]
50pub struct DrainState<F>
51where
52 F: FnMut(&[u8]) -> bool,
53{
54 pub(crate) stdout_slot: Option<FdSlot>,
55 pub(crate) stderr_slot: Option<FdSlot>,
56 pub(crate) stdin_slot: Option<FdSlot>,
57
58 pub(crate) buffer: BufferState,
59 pub(crate) writer: WriterState,
60
61 pub(crate) early_exit: Option<F>,
62}
63
64impl<F> DrainState<F>
65where
66 F: FnMut(&[u8]) -> bool,
67{
68 pub fn new(
72 stdin_fd: Option<Fd>,
73 stdin_buf: Option<Box<[u8]>>,
74 stdout_fd: Option<Fd>,
75 stderr_fd: Option<Fd>,
76 limit: usize,
77 early_exit: Option<F>,
78 ) -> Result<Self, CoreError> {
79 let stdin_slot = if stdin_buf.is_some() {
80 if let Some(fd) = stdin_fd {
81 fd.set_nonblock()?;
82 Some(FdSlot { token: None, fd })
83 } else {
84 None
85 }
86 } else {
87 None
88 };
89
90 let stdout_slot = if let Some(fd) = stdout_fd {
91 fd.set_nonblock()?;
92 Some(FdSlot { token: None, fd })
93 } else {
94 None
95 };
96
97 let stderr_slot = if let Some(fd) = stderr_fd {
98 fd.set_nonblock()?;
99 Some(FdSlot { token: None, fd })
100 } else {
101 None
102 };
103
104 Ok(Self {
105 stdin_slot,
106 stdout_slot,
107 stderr_slot,
108 buffer: BufferState::new(limit),
109 writer: WriterState::new(stdin_buf),
110 early_exit,
111 })
112 }
113
114 #[inline(always)]
116 pub fn is_done(&self) -> bool {
117 self.stdin_slot.is_none() && self.stdout_slot.is_none() && self.stderr_slot.is_none()
118 }
119
120 #[inline(always)]
122 pub fn write_stdin(&mut self) -> Result<bool, CoreError> {
123 let fd = if let Some(s) = &self.stdin_slot {
124 &s.fd
125 } else {
126 return Ok(true);
127 };
128
129 let done = self.writer.write_to_fd(fd)?;
130 if done {
131 self.stdin_slot.take();
132 return Ok(true);
133 }
134 Ok(false)
135 }
136
137 #[inline(always)]
139 pub fn read_fd(&mut self, is_stdout: bool) -> Result<bool, CoreError> {
140 let read_state = {
141 let slot = if is_stdout {
142 &self.stdout_slot
143 } else {
144 &self.stderr_slot
145 };
146 let fd = if let Some(s) = slot {
147 &s.fd
148 } else {
149 return Ok(true);
150 };
151 self.buffer
152 .read_from_fd(fd, is_stdout, &mut self.early_exit)?
153 };
154
155 if read_state != ReadState::Open {
156 if is_stdout {
157 self.stdout_slot.take();
158 return Ok(true);
159 } else {
160 self.stderr_slot.take();
161 return Ok(true);
162 }
163 }
164
165 Ok(false)
166 }
167
168 pub(crate) fn take_all_slots(&mut self) -> Vec<FdSlot> {
170 let mut slots = Vec::new();
171 if let Some(slot) = self.stdin_slot.take() {
172 slots.push(slot);
173 }
174 if let Some(slot) = self.stdout_slot.take() {
175 slots.push(slot);
176 }
177 if let Some(slot) = self.stderr_slot.take() {
178 slots.push(slot);
179 }
180 slots
181 }
182
183 pub(crate) fn register_with_reactor(
184 &mut self,
185 reactor: &mut crate::reactor::Reactor,
186 ) -> Result<(), CoreError> {
187 if let Some(mut slot) = self.stdin_slot.take() {
188 slot.token = Some(reactor.add(&slot.fd, false, true)?);
189 self.stdin_slot = Some(slot);
190 }
191 if let Some(mut slot) = self.stdout_slot.take() {
192 slot.token = Some(reactor.add(&slot.fd, true, false)?);
193 self.stdout_slot = Some(slot);
194 }
195 if let Some(mut slot) = self.stderr_slot.take() {
196 slot.token = Some(reactor.add(&slot.fd, true, false)?);
197 self.stderr_slot = Some(slot);
198 }
199 Ok(())
200 }
201
202 pub(crate) fn stdout_matches(&self, token: Token) -> bool {
203 self.stdout_slot
204 .as_ref()
205 .is_some_and(|slot| slot.token == Some(token))
206 }
207
208 pub(crate) fn stderr_matches(&self, token: Token) -> bool {
209 self.stderr_slot
210 .as_ref()
211 .is_some_and(|slot| slot.token == Some(token))
212 }
213
214 pub(crate) fn stdin_matches(&self, token: Token) -> bool {
215 self.stdin_slot
216 .as_ref()
217 .is_some_and(|slot| slot.token == Some(token))
218 }
219
220 pub(crate) fn drop_stdout(
221 &mut self,
222 reactor: &mut crate::reactor::Reactor,
223 ) -> Result<(), CoreError> {
224 if let Some(slot) = self.stdout_slot.take() {
225 reactor.del(&slot.fd)?;
226 }
227 Ok(())
228 }
229
230 pub(crate) fn drop_stderr(
231 &mut self,
232 reactor: &mut crate::reactor::Reactor,
233 ) -> Result<(), CoreError> {
234 if let Some(slot) = self.stderr_slot.take() {
235 reactor.del(&slot.fd)?;
236 }
237 Ok(())
238 }
239
240 pub(crate) fn drop_stdin(
241 &mut self,
242 reactor: &mut crate::reactor::Reactor,
243 ) -> Result<(), CoreError> {
244 if let Some(slot) = self.stdin_slot.take() {
245 reactor.del(&slot.fd)?;
246 }
247 self.writer.buf = None;
248 Ok(())
249 }
250
251 pub(crate) fn handle_stdout_ready(
252 &mut self,
253 reactor: &mut crate::reactor::Reactor,
254 ) -> Result<(), CoreError> {
255 if let Some(slot) = &self.stdout_slot {
256 let read_state = self
257 .buffer
258 .read_from_fd(&slot.fd, true, &mut self.early_exit)?;
259 if read_state != ReadState::Open {
260 self.drop_stdout(reactor)?;
261 }
262 }
263 Ok(())
264 }
265
266 pub(crate) fn handle_stderr_ready(
267 &mut self,
268 reactor: &mut crate::reactor::Reactor,
269 ) -> Result<(), CoreError> {
270 if let Some(slot) = &self.stderr_slot {
271 let read_state = self
272 .buffer
273 .read_from_fd(&slot.fd, false, &mut self.early_exit)?;
274 if read_state != ReadState::Open {
275 self.drop_stderr(reactor)?;
276 }
277 }
278 Ok(())
279 }
280
281 pub(crate) fn handle_stdin_writable(
282 &mut self,
283 reactor: &mut crate::reactor::Reactor,
284 ) -> Result<(), CoreError> {
285 if let Some(slot) = &self.stdin_slot {
286 let done = self.writer.write_to_fd(&slot.fd)?;
287 if done {
288 self.drop_stdin(reactor)?;
289 }
290 }
291 Ok(())
292 }
293
294 pub fn into_parts(mut self) -> (Vec<u8>, Vec<u8>) {
296 let (stdout, stderr, _, _) = std::mem::take(&mut self.buffer).into_parts();
297 (stdout, stderr)
298 }
299
300 #[inline(always)]
302 pub fn output_limit_exceeded(&self) -> bool {
303 self.buffer.output_limit_exceeded()
304 }
305
306 #[inline(always)]
308 pub fn stdout_early_exited(&self) -> bool {
309 self.buffer.stdout_early_exited()
310 }
311
312 pub(crate) fn into_parts_with_state(mut self) -> (Vec<u8>, Vec<u8>, bool, bool) {
314 std::mem::take(&mut self.buffer).into_parts()
315 }
316}