1#![deny(missing_docs)]
6
7use spargio::{RuntimeError, RuntimeHandle};
8use std::ffi::OsStr;
9use std::io;
10use std::process::{Child, Command, ExitStatus, Output};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14pub async fn status(handle: &RuntimeHandle, command: Command) -> io::Result<ExitStatus> {
16 status_with_options(handle, command, CommandOptions::default()).await
17}
18
19pub async fn status_with_options(
21 handle: &RuntimeHandle,
22 mut command: Command,
23 options: CommandOptions,
24) -> io::Result<ExitStatus> {
25 run_blocking(
26 handle,
27 options,
28 move || command.status(),
29 "process status task canceled",
30 "process status task timed out",
31 )
32 .await
33}
34
35pub async fn output(handle: &RuntimeHandle, command: Command) -> io::Result<Output> {
37 output_with_options(handle, command, CommandOptions::default()).await
38}
39
40pub async fn output_with_options(
42 handle: &RuntimeHandle,
43 mut command: Command,
44 options: CommandOptions,
45) -> io::Result<Output> {
46 run_blocking(
47 handle,
48 options,
49 move || command.output(),
50 "process output task canceled",
51 "process output task timed out",
52 )
53 .await
54}
55
56pub async fn spawn(handle: &RuntimeHandle, command: Command) -> io::Result<ChildHandle> {
58 spawn_with_options(handle, command, CommandOptions::default()).await
59}
60
61pub async fn spawn_with_options(
63 handle: &RuntimeHandle,
64 mut command: Command,
65 options: CommandOptions,
66) -> io::Result<ChildHandle> {
67 let child = run_blocking(
68 handle,
69 options,
70 move || command.spawn(),
71 "process spawn task canceled",
72 "process spawn task timed out",
73 )
74 .await?;
75 Ok(ChildHandle {
76 handle: handle.clone(),
77 child: Arc::new(Mutex::new(Some(child))),
78 })
79}
80
81#[derive(Debug, Clone, Copy, Default)]
82pub struct CommandOptions {
84 timeout: Option<Duration>,
85}
86
87impl CommandOptions {
88 pub fn with_timeout(mut self, timeout: Duration) -> Self {
90 self.timeout = Some(timeout);
91 self
92 }
93
94 fn timeout(self) -> Option<Duration> {
95 self.timeout
96 }
97}
98
99pub struct CommandBuilder {
101 command: Command,
102}
103
104impl CommandBuilder {
105 pub fn new(program: impl AsRef<OsStr>) -> Self {
107 Self {
108 command: Command::new(program),
109 }
110 }
111
112 pub fn arg(mut self, arg: impl AsRef<OsStr>) -> Self {
114 self.command.arg(arg);
115 self
116 }
117
118 pub fn args<I, S>(mut self, args: I) -> Self
120 where
121 I: IntoIterator<Item = S>,
122 S: AsRef<OsStr>,
123 {
124 self.command.args(args);
125 self
126 }
127
128 pub async fn status(self, handle: &RuntimeHandle) -> io::Result<ExitStatus> {
130 status(handle, self.command).await
131 }
132
133 pub async fn status_with_options(
135 self,
136 handle: &RuntimeHandle,
137 options: CommandOptions,
138 ) -> io::Result<ExitStatus> {
139 status_with_options(handle, self.command, options).await
140 }
141
142 pub async fn output(self, handle: &RuntimeHandle) -> io::Result<Output> {
144 output(handle, self.command).await
145 }
146
147 pub async fn output_with_options(
149 self,
150 handle: &RuntimeHandle,
151 options: CommandOptions,
152 ) -> io::Result<Output> {
153 output_with_options(handle, self.command, options).await
154 }
155
156 pub async fn spawn(self, handle: &RuntimeHandle) -> io::Result<ChildHandle> {
158 spawn(handle, self.command).await
159 }
160
161 pub async fn spawn_with_options(
163 self,
164 handle: &RuntimeHandle,
165 options: CommandOptions,
166 ) -> io::Result<ChildHandle> {
167 spawn_with_options(handle, self.command, options).await
168 }
169}
170
171#[derive(Clone)]
172pub struct ChildHandle {
174 handle: RuntimeHandle,
175 child: Arc<Mutex<Option<Child>>>,
176}
177
178impl ChildHandle {
179 pub fn id(&self) -> Option<u32> {
181 let guard = self.child.lock().expect("child lock poisoned");
182 guard.as_ref().map(Child::id)
183 }
184
185 pub async fn wait(&self) -> io::Result<ExitStatus> {
187 self.wait_with_options(CommandOptions::default()).await
188 }
189
190 pub async fn wait_with_options(&self, options: CommandOptions) -> io::Result<ExitStatus> {
192 self.run_with_child(
193 options,
194 |child| child.wait(),
195 "process wait task canceled",
196 "process wait task timed out",
197 )
198 .await
199 }
200
201 pub async fn try_wait(&self) -> io::Result<Option<ExitStatus>> {
203 self.run_with_child(
204 CommandOptions::default(),
205 |child| child.try_wait(),
206 "process try_wait task canceled",
207 "process try_wait task timed out",
208 )
209 .await
210 }
211
212 pub async fn kill(&self) -> io::Result<()> {
214 self.run_with_child(
215 CommandOptions::default(),
216 |child| child.kill(),
217 "process kill task canceled",
218 "process kill task timed out",
219 )
220 .await
221 }
222
223 pub async fn output(&self) -> io::Result<Output> {
225 self.output_with_options(CommandOptions::default()).await
226 }
227
228 pub async fn output_with_options(&self, options: CommandOptions) -> io::Result<Output> {
230 let child = self.take_child()?;
231 let handle = self.handle.clone();
232 run_blocking(
233 &handle,
234 options,
235 move || child.wait_with_output(),
236 "process output task canceled",
237 "process output task timed out",
238 )
239 .await
240 }
241
242 fn take_child(&self) -> io::Result<Child> {
243 let mut guard = self.child.lock().expect("child lock poisoned");
244 guard
245 .take()
246 .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "child already consumed"))
247 }
248
249 async fn run_with_child<T, F>(
250 &self,
251 options: CommandOptions,
252 f: F,
253 canceled_msg: &'static str,
254 timeout_msg: &'static str,
255 ) -> io::Result<T>
256 where
257 T: Send + 'static,
258 F: FnOnce(&mut Child) -> io::Result<T> + Send + 'static,
259 {
260 let child = self.child.clone();
261 run_blocking(
262 &self.handle,
263 options,
264 move || {
265 let mut guard = child.lock().expect("child lock poisoned");
266 let child = guard.as_mut().ok_or_else(|| {
267 io::Error::new(io::ErrorKind::BrokenPipe, "child already consumed")
268 })?;
269 f(child)
270 },
271 canceled_msg,
272 timeout_msg,
273 )
274 .await
275 }
276}
277
278async fn run_blocking<T, F>(
279 handle: &RuntimeHandle,
280 options: CommandOptions,
281 f: F,
282 canceled_msg: &'static str,
283 timeout_msg: &'static str,
284) -> io::Result<T>
285where
286 T: Send + 'static,
287 F: FnOnce() -> io::Result<T> + Send + 'static,
288{
289 let join = handle
290 .spawn_blocking(f)
291 .map_err(runtime_error_to_io_for_blocking)?;
292 let joined = match options.timeout() {
293 Some(duration) => match spargio::timeout(duration, join).await {
294 Ok(result) => result,
295 Err(_) => return Err(io::Error::new(io::ErrorKind::TimedOut, timeout_msg)),
296 },
297 None => join.await,
298 };
299 joined.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, canceled_msg))?
300}
301
302fn runtime_error_to_io_for_blocking(err: RuntimeError) -> io::Error {
303 match err {
304 RuntimeError::InvalidConfig(msg) => io::Error::new(io::ErrorKind::InvalidInput, msg),
305 RuntimeError::ThreadSpawn(io) => io,
306 RuntimeError::InvalidShard(shard) => {
307 io::Error::new(io::ErrorKind::NotFound, format!("invalid shard {shard}"))
308 }
309 RuntimeError::Closed => io::Error::new(io::ErrorKind::BrokenPipe, "runtime closed"),
310 RuntimeError::Overloaded => io::Error::new(io::ErrorKind::WouldBlock, "runtime overloaded"),
311 RuntimeError::UnsupportedBackend(msg) => io::Error::new(io::ErrorKind::Unsupported, msg),
312 RuntimeError::IoUringInit(io) => io,
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use futures::executor::block_on;
320 use std::time::Duration;
321
322 fn success_command() -> Command {
323 if cfg!(windows) {
324 let mut cmd = Command::new("cmd");
325 cmd.args(["/C", "exit", "0"]);
326 cmd
327 } else {
328 let mut cmd = Command::new("sh");
329 cmd.args(["-c", "exit 0"]);
330 cmd
331 }
332 }
333
334 #[test]
335 fn command_builder_status_runs() {
336 let rt = spargio::Runtime::builder()
337 .shards(1)
338 .build()
339 .expect("runtime");
340 let status = block_on(async {
341 CommandBuilder::new(if cfg!(windows) { "cmd" } else { "sh" })
342 .args(if cfg!(windows) {
343 vec!["/C", "exit", "0"]
344 } else {
345 vec!["-c", "exit 0"]
346 })
347 .status(&rt.handle())
348 .await
349 .expect("status")
350 });
351 assert!(status.success());
352 }
353
354 #[test]
355 fn status_function_runs() {
356 let rt = spargio::Runtime::builder()
357 .shards(1)
358 .build()
359 .expect("runtime");
360 let status = block_on(async {
361 status(&rt.handle(), success_command())
362 .await
363 .expect("status")
364 });
365 assert!(status.success());
366 }
367
368 #[test]
369 fn status_with_options_timeout_fails() {
370 let rt = spargio::Runtime::builder()
371 .shards(1)
372 .build()
373 .expect("runtime");
374 let err = block_on(async {
375 status_with_options(
376 &rt.handle(),
377 if cfg!(windows) {
378 let mut cmd = Command::new("cmd");
379 cmd.args(["/C", "ping -n 2 127.0.0.1 > nul"]);
380 cmd
381 } else {
382 let mut cmd = Command::new("sh");
383 cmd.args(["-c", "sleep 0.1"]);
384 cmd
385 },
386 CommandOptions::default().with_timeout(Duration::from_millis(5)),
387 )
388 .await
389 .expect_err("timeout")
390 });
391 assert_eq!(err.kind(), io::ErrorKind::TimedOut);
392 }
393}