1use spargio::{RuntimeError, RuntimeHandle};
7use std::ffi::OsStr;
8use std::io;
9use std::process::{Child, Command, ExitStatus, Output};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13pub async fn status(handle: &RuntimeHandle, command: Command) -> io::Result<ExitStatus> {
14 status_with_options(handle, command, CommandOptions::default()).await
15}
16
17pub async fn status_with_options(
18 handle: &RuntimeHandle,
19 mut command: Command,
20 options: CommandOptions,
21) -> io::Result<ExitStatus> {
22 run_blocking(
23 handle,
24 options,
25 move || command.status(),
26 "process status task canceled",
27 "process status task timed out",
28 )
29 .await
30}
31
32pub async fn output(handle: &RuntimeHandle, command: Command) -> io::Result<Output> {
33 output_with_options(handle, command, CommandOptions::default()).await
34}
35
36pub async fn output_with_options(
37 handle: &RuntimeHandle,
38 mut command: Command,
39 options: CommandOptions,
40) -> io::Result<Output> {
41 run_blocking(
42 handle,
43 options,
44 move || command.output(),
45 "process output task canceled",
46 "process output task timed out",
47 )
48 .await
49}
50
51pub async fn spawn(handle: &RuntimeHandle, command: Command) -> io::Result<ChildHandle> {
52 spawn_with_options(handle, command, CommandOptions::default()).await
53}
54
55pub async fn spawn_with_options(
56 handle: &RuntimeHandle,
57 mut command: Command,
58 options: CommandOptions,
59) -> io::Result<ChildHandle> {
60 let child = run_blocking(
61 handle,
62 options,
63 move || command.spawn(),
64 "process spawn task canceled",
65 "process spawn task timed out",
66 )
67 .await?;
68 Ok(ChildHandle {
69 handle: handle.clone(),
70 child: Arc::new(Mutex::new(Some(child))),
71 })
72}
73
74#[derive(Debug, Clone, Copy, Default)]
75pub struct CommandOptions {
76 timeout: Option<Duration>,
77}
78
79impl CommandOptions {
80 pub fn with_timeout(mut self, timeout: Duration) -> Self {
81 self.timeout = Some(timeout);
82 self
83 }
84
85 fn timeout(self) -> Option<Duration> {
86 self.timeout
87 }
88}
89
90pub struct CommandBuilder {
91 command: Command,
92}
93
94impl CommandBuilder {
95 pub fn new(program: impl AsRef<OsStr>) -> Self {
96 Self {
97 command: Command::new(program),
98 }
99 }
100
101 pub fn arg(mut self, arg: impl AsRef<OsStr>) -> Self {
102 self.command.arg(arg);
103 self
104 }
105
106 pub fn args<I, S>(mut self, args: I) -> Self
107 where
108 I: IntoIterator<Item = S>,
109 S: AsRef<OsStr>,
110 {
111 self.command.args(args);
112 self
113 }
114
115 pub async fn status(self, handle: &RuntimeHandle) -> io::Result<ExitStatus> {
116 status(handle, self.command).await
117 }
118
119 pub async fn status_with_options(
120 self,
121 handle: &RuntimeHandle,
122 options: CommandOptions,
123 ) -> io::Result<ExitStatus> {
124 status_with_options(handle, self.command, options).await
125 }
126
127 pub async fn output(self, handle: &RuntimeHandle) -> io::Result<Output> {
128 output(handle, self.command).await
129 }
130
131 pub async fn output_with_options(
132 self,
133 handle: &RuntimeHandle,
134 options: CommandOptions,
135 ) -> io::Result<Output> {
136 output_with_options(handle, self.command, options).await
137 }
138
139 pub async fn spawn(self, handle: &RuntimeHandle) -> io::Result<ChildHandle> {
140 spawn(handle, self.command).await
141 }
142
143 pub async fn spawn_with_options(
144 self,
145 handle: &RuntimeHandle,
146 options: CommandOptions,
147 ) -> io::Result<ChildHandle> {
148 spawn_with_options(handle, self.command, options).await
149 }
150}
151
152#[derive(Clone)]
153pub struct ChildHandle {
154 handle: RuntimeHandle,
155 child: Arc<Mutex<Option<Child>>>,
156}
157
158impl ChildHandle {
159 pub fn id(&self) -> Option<u32> {
160 let guard = self.child.lock().expect("child lock poisoned");
161 guard.as_ref().map(Child::id)
162 }
163
164 pub async fn wait(&self) -> io::Result<ExitStatus> {
165 self.wait_with_options(CommandOptions::default()).await
166 }
167
168 pub async fn wait_with_options(&self, options: CommandOptions) -> io::Result<ExitStatus> {
169 self.run_with_child(
170 options,
171 |child| child.wait(),
172 "process wait task canceled",
173 "process wait task timed out",
174 )
175 .await
176 }
177
178 pub async fn try_wait(&self) -> io::Result<Option<ExitStatus>> {
179 self.run_with_child(
180 CommandOptions::default(),
181 |child| child.try_wait(),
182 "process try_wait task canceled",
183 "process try_wait task timed out",
184 )
185 .await
186 }
187
188 pub async fn kill(&self) -> io::Result<()> {
189 self.run_with_child(
190 CommandOptions::default(),
191 |child| child.kill(),
192 "process kill task canceled",
193 "process kill task timed out",
194 )
195 .await
196 }
197
198 pub async fn output(&self) -> io::Result<Output> {
199 self.output_with_options(CommandOptions::default()).await
200 }
201
202 pub async fn output_with_options(&self, options: CommandOptions) -> io::Result<Output> {
203 let child = self.take_child()?;
204 let handle = self.handle.clone();
205 run_blocking(
206 &handle,
207 options,
208 move || child.wait_with_output(),
209 "process output task canceled",
210 "process output task timed out",
211 )
212 .await
213 }
214
215 fn take_child(&self) -> io::Result<Child> {
216 let mut guard = self.child.lock().expect("child lock poisoned");
217 guard
218 .take()
219 .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "child already consumed"))
220 }
221
222 async fn run_with_child<T, F>(
223 &self,
224 options: CommandOptions,
225 f: F,
226 canceled_msg: &'static str,
227 timeout_msg: &'static str,
228 ) -> io::Result<T>
229 where
230 T: Send + 'static,
231 F: FnOnce(&mut Child) -> io::Result<T> + Send + 'static,
232 {
233 let child = self.child.clone();
234 run_blocking(
235 &self.handle,
236 options,
237 move || {
238 let mut guard = child.lock().expect("child lock poisoned");
239 let child = guard.as_mut().ok_or_else(|| {
240 io::Error::new(io::ErrorKind::BrokenPipe, "child already consumed")
241 })?;
242 f(child)
243 },
244 canceled_msg,
245 timeout_msg,
246 )
247 .await
248 }
249}
250
251async fn run_blocking<T, F>(
252 handle: &RuntimeHandle,
253 options: CommandOptions,
254 f: F,
255 canceled_msg: &'static str,
256 timeout_msg: &'static str,
257) -> io::Result<T>
258where
259 T: Send + 'static,
260 F: FnOnce() -> io::Result<T> + Send + 'static,
261{
262 let join = handle
263 .spawn_blocking(f)
264 .map_err(runtime_error_to_io_for_blocking)?;
265 let joined = match options.timeout() {
266 Some(duration) => match spargio::timeout(duration, join).await {
267 Ok(result) => result,
268 Err(_) => return Err(io::Error::new(io::ErrorKind::TimedOut, timeout_msg)),
269 },
270 None => join.await,
271 };
272 joined.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, canceled_msg))?
273}
274
275fn runtime_error_to_io_for_blocking(err: RuntimeError) -> io::Error {
276 match err {
277 RuntimeError::InvalidConfig(msg) => io::Error::new(io::ErrorKind::InvalidInput, msg),
278 RuntimeError::ThreadSpawn(io) => io,
279 RuntimeError::InvalidShard(shard) => {
280 io::Error::new(io::ErrorKind::NotFound, format!("invalid shard {shard}"))
281 }
282 RuntimeError::Closed => io::Error::new(io::ErrorKind::BrokenPipe, "runtime closed"),
283 RuntimeError::Overloaded => io::Error::new(io::ErrorKind::WouldBlock, "runtime overloaded"),
284 RuntimeError::UnsupportedBackend(msg) => io::Error::new(io::ErrorKind::Unsupported, msg),
285 RuntimeError::IoUringInit(io) => io,
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use futures::executor::block_on;
293 use std::time::Duration;
294
295 fn success_command() -> Command {
296 if cfg!(windows) {
297 let mut cmd = Command::new("cmd");
298 cmd.args(["/C", "exit", "0"]);
299 cmd
300 } else {
301 let mut cmd = Command::new("sh");
302 cmd.args(["-c", "exit 0"]);
303 cmd
304 }
305 }
306
307 #[test]
308 fn command_builder_status_runs() {
309 let rt = spargio::Runtime::builder()
310 .shards(1)
311 .build()
312 .expect("runtime");
313 let status = block_on(async {
314 CommandBuilder::new(if cfg!(windows) { "cmd" } else { "sh" })
315 .args(if cfg!(windows) {
316 vec!["/C", "exit", "0"]
317 } else {
318 vec!["-c", "exit 0"]
319 })
320 .status(&rt.handle())
321 .await
322 .expect("status")
323 });
324 assert!(status.success());
325 }
326
327 #[test]
328 fn status_function_runs() {
329 let rt = spargio::Runtime::builder()
330 .shards(1)
331 .build()
332 .expect("runtime");
333 let status = block_on(async {
334 status(&rt.handle(), success_command())
335 .await
336 .expect("status")
337 });
338 assert!(status.success());
339 }
340
341 #[test]
342 fn status_with_options_timeout_fails() {
343 let rt = spargio::Runtime::builder()
344 .shards(1)
345 .build()
346 .expect("runtime");
347 let err = block_on(async {
348 status_with_options(
349 &rt.handle(),
350 if cfg!(windows) {
351 let mut cmd = Command::new("cmd");
352 cmd.args(["/C", "ping -n 2 127.0.0.1 > nul"]);
353 cmd
354 } else {
355 let mut cmd = Command::new("sh");
356 cmd.args(["-c", "sleep 0.1"]);
357 cmd
358 },
359 CommandOptions::default().with_timeout(Duration::from_millis(5)),
360 )
361 .await
362 .expect_err("timeout")
363 });
364 assert_eq!(err.kind(), io::ErrorKind::TimedOut);
365 }
366}