Skip to main content

spargio_process/
lib.rs

1//! Process companion APIs for spargio runtimes.
2//!
3//! These helpers expose async process status/output through spargio's
4//! `spawn_blocking` bridge.
5#![deny(missing_docs)]
6
7use spargio::{RuntimeError, RuntimeHandle};
8use std::ffi::OsStr;
9use std::io;
10use std::process::{Child, Command, ExitStatus, Output};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14/// Runs `command` and resolves with its exit status.
15pub async fn status(handle: &RuntimeHandle, command: Command) -> io::Result<ExitStatus> {
16    status_with_options(handle, command, CommandOptions::default()).await
17}
18
19/// Runs `command` and resolves with its exit status using custom options.
20pub async fn status_with_options(
21    handle: &RuntimeHandle,
22    mut command: Command,
23    options: CommandOptions,
24) -> io::Result<ExitStatus> {
25    run_blocking(
26        handle,
27        options,
28        move || command.status(),
29        "process status task canceled",
30        "process status task timed out",
31    )
32    .await
33}
34
35/// Runs `command` and resolves with captured output.
36pub async fn output(handle: &RuntimeHandle, command: Command) -> io::Result<Output> {
37    output_with_options(handle, command, CommandOptions::default()).await
38}
39
40/// Runs `command` and resolves with captured output using custom options.
41pub async fn output_with_options(
42    handle: &RuntimeHandle,
43    mut command: Command,
44    options: CommandOptions,
45) -> io::Result<Output> {
46    run_blocking(
47        handle,
48        options,
49        move || command.output(),
50        "process output task canceled",
51        "process output task timed out",
52    )
53    .await
54}
55
56/// Spawns `command` and returns a handle for further interaction.
57pub async fn spawn(handle: &RuntimeHandle, command: Command) -> io::Result<ChildHandle> {
58    spawn_with_options(handle, command, CommandOptions::default()).await
59}
60
61/// Spawns `command` and returns a handle for further interaction using options.
62pub async fn spawn_with_options(
63    handle: &RuntimeHandle,
64    mut command: Command,
65    options: CommandOptions,
66) -> io::Result<ChildHandle> {
67    let child = run_blocking(
68        handle,
69        options,
70        move || command.spawn(),
71        "process spawn task canceled",
72        "process spawn task timed out",
73    )
74    .await?;
75    Ok(ChildHandle {
76        handle: handle.clone(),
77        child: Arc::new(Mutex::new(Some(child))),
78    })
79}
80
81#[derive(Debug, Clone, Copy, Default)]
82/// Options applied to blocking process operations.
83pub struct CommandOptions {
84    timeout: Option<Duration>,
85}
86
87impl CommandOptions {
88    /// Sets an operation timeout.
89    pub fn with_timeout(mut self, timeout: Duration) -> Self {
90        self.timeout = Some(timeout);
91        self
92    }
93
94    fn timeout(self) -> Option<Duration> {
95        self.timeout
96    }
97}
98
99/// Fluent builder for process commands.
100pub struct CommandBuilder {
101    command: Command,
102}
103
104impl CommandBuilder {
105    /// Creates a new command builder with `program`.
106    pub fn new(program: impl AsRef<OsStr>) -> Self {
107        Self {
108            command: Command::new(program),
109        }
110    }
111
112    /// Appends one argument.
113    pub fn arg(mut self, arg: impl AsRef<OsStr>) -> Self {
114        self.command.arg(arg);
115        self
116    }
117
118    /// Appends multiple arguments.
119    pub fn args<I, S>(mut self, args: I) -> Self
120    where
121        I: IntoIterator<Item = S>,
122        S: AsRef<OsStr>,
123    {
124        self.command.args(args);
125        self
126    }
127
128    /// Runs the built command and returns its exit status.
129    pub async fn status(self, handle: &RuntimeHandle) -> io::Result<ExitStatus> {
130        status(handle, self.command).await
131    }
132
133    /// Runs the built command and returns its exit status using options.
134    pub async fn status_with_options(
135        self,
136        handle: &RuntimeHandle,
137        options: CommandOptions,
138    ) -> io::Result<ExitStatus> {
139        status_with_options(handle, self.command, options).await
140    }
141
142    /// Runs the built command and returns captured output.
143    pub async fn output(self, handle: &RuntimeHandle) -> io::Result<Output> {
144        output(handle, self.command).await
145    }
146
147    /// Runs the built command and returns captured output using options.
148    pub async fn output_with_options(
149        self,
150        handle: &RuntimeHandle,
151        options: CommandOptions,
152    ) -> io::Result<Output> {
153        output_with_options(handle, self.command, options).await
154    }
155
156    /// Spawns the built command and returns a child handle.
157    pub async fn spawn(self, handle: &RuntimeHandle) -> io::Result<ChildHandle> {
158        spawn(handle, self.command).await
159    }
160
161    /// Spawns the built command and returns a child handle using options.
162    pub async fn spawn_with_options(
163        self,
164        handle: &RuntimeHandle,
165        options: CommandOptions,
166    ) -> io::Result<ChildHandle> {
167        spawn_with_options(handle, self.command, options).await
168    }
169}
170
171#[derive(Clone)]
172/// Async wrapper around a spawned child process.
173pub struct ChildHandle {
174    handle: RuntimeHandle,
175    child: Arc<Mutex<Option<Child>>>,
176}
177
178impl ChildHandle {
179    /// Returns the process id if the child is still present.
180    pub fn id(&self) -> Option<u32> {
181        let guard = self.child.lock().expect("child lock poisoned");
182        guard.as_ref().map(Child::id)
183    }
184
185    /// Waits for child termination.
186    pub async fn wait(&self) -> io::Result<ExitStatus> {
187        self.wait_with_options(CommandOptions::default()).await
188    }
189
190    /// Waits for child termination using options.
191    pub async fn wait_with_options(&self, options: CommandOptions) -> io::Result<ExitStatus> {
192        self.run_with_child(
193            options,
194            |child| child.wait(),
195            "process wait task canceled",
196            "process wait task timed out",
197        )
198        .await
199    }
200
201    /// Non-blocking check for child termination.
202    pub async fn try_wait(&self) -> io::Result<Option<ExitStatus>> {
203        self.run_with_child(
204            CommandOptions::default(),
205            |child| child.try_wait(),
206            "process try_wait task canceled",
207            "process try_wait task timed out",
208        )
209        .await
210    }
211
212    /// Sends a kill signal to the child process.
213    pub async fn kill(&self) -> io::Result<()> {
214        self.run_with_child(
215            CommandOptions::default(),
216            |child| child.kill(),
217            "process kill task canceled",
218            "process kill task timed out",
219        )
220        .await
221    }
222
223    /// Waits for child termination and captures output.
224    pub async fn output(&self) -> io::Result<Output> {
225        self.output_with_options(CommandOptions::default()).await
226    }
227
228    /// Waits for child termination and captures output using options.
229    pub async fn output_with_options(&self, options: CommandOptions) -> io::Result<Output> {
230        let child = self.take_child()?;
231        let handle = self.handle.clone();
232        run_blocking(
233            &handle,
234            options,
235            move || child.wait_with_output(),
236            "process output task canceled",
237            "process output task timed out",
238        )
239        .await
240    }
241
242    fn take_child(&self) -> io::Result<Child> {
243        let mut guard = self.child.lock().expect("child lock poisoned");
244        guard
245            .take()
246            .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "child already consumed"))
247    }
248
249    async fn run_with_child<T, F>(
250        &self,
251        options: CommandOptions,
252        f: F,
253        canceled_msg: &'static str,
254        timeout_msg: &'static str,
255    ) -> io::Result<T>
256    where
257        T: Send + 'static,
258        F: FnOnce(&mut Child) -> io::Result<T> + Send + 'static,
259    {
260        let child = self.child.clone();
261        run_blocking(
262            &self.handle,
263            options,
264            move || {
265                let mut guard = child.lock().expect("child lock poisoned");
266                let child = guard.as_mut().ok_or_else(|| {
267                    io::Error::new(io::ErrorKind::BrokenPipe, "child already consumed")
268                })?;
269                f(child)
270            },
271            canceled_msg,
272            timeout_msg,
273        )
274        .await
275    }
276}
277
278async fn run_blocking<T, F>(
279    handle: &RuntimeHandle,
280    options: CommandOptions,
281    f: F,
282    canceled_msg: &'static str,
283    timeout_msg: &'static str,
284) -> io::Result<T>
285where
286    T: Send + 'static,
287    F: FnOnce() -> io::Result<T> + Send + 'static,
288{
289    let join = handle
290        .spawn_blocking(f)
291        .map_err(runtime_error_to_io_for_blocking)?;
292    let joined = match options.timeout() {
293        Some(duration) => match spargio::timeout(duration, join).await {
294            Ok(result) => result,
295            Err(_) => return Err(io::Error::new(io::ErrorKind::TimedOut, timeout_msg)),
296        },
297        None => join.await,
298    };
299    joined.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, canceled_msg))?
300}
301
302fn runtime_error_to_io_for_blocking(err: RuntimeError) -> io::Error {
303    match err {
304        RuntimeError::InvalidConfig(msg) => io::Error::new(io::ErrorKind::InvalidInput, msg),
305        RuntimeError::ThreadSpawn(io) => io,
306        RuntimeError::InvalidShard(shard) => {
307            io::Error::new(io::ErrorKind::NotFound, format!("invalid shard {shard}"))
308        }
309        RuntimeError::Closed => io::Error::new(io::ErrorKind::BrokenPipe, "runtime closed"),
310        RuntimeError::Overloaded => io::Error::new(io::ErrorKind::WouldBlock, "runtime overloaded"),
311        RuntimeError::UnsupportedBackend(msg) => io::Error::new(io::ErrorKind::Unsupported, msg),
312        RuntimeError::IoUringInit(io) => io,
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use futures::executor::block_on;
320    use std::time::Duration;
321
322    fn success_command() -> Command {
323        if cfg!(windows) {
324            let mut cmd = Command::new("cmd");
325            cmd.args(["/C", "exit", "0"]);
326            cmd
327        } else {
328            let mut cmd = Command::new("sh");
329            cmd.args(["-c", "exit 0"]);
330            cmd
331        }
332    }
333
334    #[test]
335    fn command_builder_status_runs() {
336        let rt = spargio::Runtime::builder()
337            .shards(1)
338            .build()
339            .expect("runtime");
340        let status = block_on(async {
341            CommandBuilder::new(if cfg!(windows) { "cmd" } else { "sh" })
342                .args(if cfg!(windows) {
343                    vec!["/C", "exit", "0"]
344                } else {
345                    vec!["-c", "exit 0"]
346                })
347                .status(&rt.handle())
348                .await
349                .expect("status")
350        });
351        assert!(status.success());
352    }
353
354    #[test]
355    fn status_function_runs() {
356        let rt = spargio::Runtime::builder()
357            .shards(1)
358            .build()
359            .expect("runtime");
360        let status = block_on(async {
361            status(&rt.handle(), success_command())
362                .await
363                .expect("status")
364        });
365        assert!(status.success());
366    }
367
368    #[test]
369    fn status_with_options_timeout_fails() {
370        let rt = spargio::Runtime::builder()
371            .shards(1)
372            .build()
373            .expect("runtime");
374        let err = block_on(async {
375            status_with_options(
376                &rt.handle(),
377                if cfg!(windows) {
378                    let mut cmd = Command::new("cmd");
379                    cmd.args(["/C", "ping -n 2 127.0.0.1 > nul"]);
380                    cmd
381                } else {
382                    let mut cmd = Command::new("sh");
383                    cmd.args(["-c", "sleep 0.1"]);
384                    cmd
385                },
386                CommandOptions::default().with_timeout(Duration::from_millis(5)),
387            )
388            .await
389            .expect_err("timeout")
390        });
391        assert_eq!(err.kind(), io::ErrorKind::TimedOut);
392    }
393}