Skip to main content

spargio_process/
lib.rs

1//! Process companion APIs for spargio runtimes.
2//!
3//! These helpers expose async process status/output through spargio's
4//! `spawn_blocking` bridge.
5
6use spargio::{RuntimeError, RuntimeHandle};
7use std::ffi::OsStr;
8use std::io;
9use std::process::{Child, Command, ExitStatus, Output};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13pub async fn status(handle: &RuntimeHandle, command: Command) -> io::Result<ExitStatus> {
14    status_with_options(handle, command, CommandOptions::default()).await
15}
16
17pub async fn status_with_options(
18    handle: &RuntimeHandle,
19    mut command: Command,
20    options: CommandOptions,
21) -> io::Result<ExitStatus> {
22    run_blocking(
23        handle,
24        options,
25        move || command.status(),
26        "process status task canceled",
27        "process status task timed out",
28    )
29    .await
30}
31
32pub async fn output(handle: &RuntimeHandle, command: Command) -> io::Result<Output> {
33    output_with_options(handle, command, CommandOptions::default()).await
34}
35
36pub async fn output_with_options(
37    handle: &RuntimeHandle,
38    mut command: Command,
39    options: CommandOptions,
40) -> io::Result<Output> {
41    run_blocking(
42        handle,
43        options,
44        move || command.output(),
45        "process output task canceled",
46        "process output task timed out",
47    )
48    .await
49}
50
51pub async fn spawn(handle: &RuntimeHandle, command: Command) -> io::Result<ChildHandle> {
52    spawn_with_options(handle, command, CommandOptions::default()).await
53}
54
55pub async fn spawn_with_options(
56    handle: &RuntimeHandle,
57    mut command: Command,
58    options: CommandOptions,
59) -> io::Result<ChildHandle> {
60    let child = run_blocking(
61        handle,
62        options,
63        move || command.spawn(),
64        "process spawn task canceled",
65        "process spawn task timed out",
66    )
67    .await?;
68    Ok(ChildHandle {
69        handle: handle.clone(),
70        child: Arc::new(Mutex::new(Some(child))),
71    })
72}
73
74#[derive(Debug, Clone, Copy, Default)]
75pub struct CommandOptions {
76    timeout: Option<Duration>,
77}
78
79impl CommandOptions {
80    pub fn with_timeout(mut self, timeout: Duration) -> Self {
81        self.timeout = Some(timeout);
82        self
83    }
84
85    fn timeout(self) -> Option<Duration> {
86        self.timeout
87    }
88}
89
90pub struct CommandBuilder {
91    command: Command,
92}
93
94impl CommandBuilder {
95    pub fn new(program: impl AsRef<OsStr>) -> Self {
96        Self {
97            command: Command::new(program),
98        }
99    }
100
101    pub fn arg(mut self, arg: impl AsRef<OsStr>) -> Self {
102        self.command.arg(arg);
103        self
104    }
105
106    pub fn args<I, S>(mut self, args: I) -> Self
107    where
108        I: IntoIterator<Item = S>,
109        S: AsRef<OsStr>,
110    {
111        self.command.args(args);
112        self
113    }
114
115    pub async fn status(self, handle: &RuntimeHandle) -> io::Result<ExitStatus> {
116        status(handle, self.command).await
117    }
118
119    pub async fn status_with_options(
120        self,
121        handle: &RuntimeHandle,
122        options: CommandOptions,
123    ) -> io::Result<ExitStatus> {
124        status_with_options(handle, self.command, options).await
125    }
126
127    pub async fn output(self, handle: &RuntimeHandle) -> io::Result<Output> {
128        output(handle, self.command).await
129    }
130
131    pub async fn output_with_options(
132        self,
133        handle: &RuntimeHandle,
134        options: CommandOptions,
135    ) -> io::Result<Output> {
136        output_with_options(handle, self.command, options).await
137    }
138
139    pub async fn spawn(self, handle: &RuntimeHandle) -> io::Result<ChildHandle> {
140        spawn(handle, self.command).await
141    }
142
143    pub async fn spawn_with_options(
144        self,
145        handle: &RuntimeHandle,
146        options: CommandOptions,
147    ) -> io::Result<ChildHandle> {
148        spawn_with_options(handle, self.command, options).await
149    }
150}
151
152#[derive(Clone)]
153pub struct ChildHandle {
154    handle: RuntimeHandle,
155    child: Arc<Mutex<Option<Child>>>,
156}
157
158impl ChildHandle {
159    pub fn id(&self) -> Option<u32> {
160        let guard = self.child.lock().expect("child lock poisoned");
161        guard.as_ref().map(Child::id)
162    }
163
164    pub async fn wait(&self) -> io::Result<ExitStatus> {
165        self.wait_with_options(CommandOptions::default()).await
166    }
167
168    pub async fn wait_with_options(&self, options: CommandOptions) -> io::Result<ExitStatus> {
169        self.run_with_child(
170            options,
171            |child| child.wait(),
172            "process wait task canceled",
173            "process wait task timed out",
174        )
175        .await
176    }
177
178    pub async fn try_wait(&self) -> io::Result<Option<ExitStatus>> {
179        self.run_with_child(
180            CommandOptions::default(),
181            |child| child.try_wait(),
182            "process try_wait task canceled",
183            "process try_wait task timed out",
184        )
185        .await
186    }
187
188    pub async fn kill(&self) -> io::Result<()> {
189        self.run_with_child(
190            CommandOptions::default(),
191            |child| child.kill(),
192            "process kill task canceled",
193            "process kill task timed out",
194        )
195        .await
196    }
197
198    pub async fn output(&self) -> io::Result<Output> {
199        self.output_with_options(CommandOptions::default()).await
200    }
201
202    pub async fn output_with_options(&self, options: CommandOptions) -> io::Result<Output> {
203        let child = self.take_child()?;
204        let handle = self.handle.clone();
205        run_blocking(
206            &handle,
207            options,
208            move || child.wait_with_output(),
209            "process output task canceled",
210            "process output task timed out",
211        )
212        .await
213    }
214
215    fn take_child(&self) -> io::Result<Child> {
216        let mut guard = self.child.lock().expect("child lock poisoned");
217        guard
218            .take()
219            .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "child already consumed"))
220    }
221
222    async fn run_with_child<T, F>(
223        &self,
224        options: CommandOptions,
225        f: F,
226        canceled_msg: &'static str,
227        timeout_msg: &'static str,
228    ) -> io::Result<T>
229    where
230        T: Send + 'static,
231        F: FnOnce(&mut Child) -> io::Result<T> + Send + 'static,
232    {
233        let child = self.child.clone();
234        run_blocking(
235            &self.handle,
236            options,
237            move || {
238                let mut guard = child.lock().expect("child lock poisoned");
239                let child = guard.as_mut().ok_or_else(|| {
240                    io::Error::new(io::ErrorKind::BrokenPipe, "child already consumed")
241                })?;
242                f(child)
243            },
244            canceled_msg,
245            timeout_msg,
246        )
247        .await
248    }
249}
250
251async fn run_blocking<T, F>(
252    handle: &RuntimeHandle,
253    options: CommandOptions,
254    f: F,
255    canceled_msg: &'static str,
256    timeout_msg: &'static str,
257) -> io::Result<T>
258where
259    T: Send + 'static,
260    F: FnOnce() -> io::Result<T> + Send + 'static,
261{
262    let join = handle
263        .spawn_blocking(f)
264        .map_err(runtime_error_to_io_for_blocking)?;
265    let joined = match options.timeout() {
266        Some(duration) => match spargio::timeout(duration, join).await {
267            Ok(result) => result,
268            Err(_) => return Err(io::Error::new(io::ErrorKind::TimedOut, timeout_msg)),
269        },
270        None => join.await,
271    };
272    joined.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, canceled_msg))?
273}
274
275fn runtime_error_to_io_for_blocking(err: RuntimeError) -> io::Error {
276    match err {
277        RuntimeError::InvalidConfig(msg) => io::Error::new(io::ErrorKind::InvalidInput, msg),
278        RuntimeError::ThreadSpawn(io) => io,
279        RuntimeError::InvalidShard(shard) => {
280            io::Error::new(io::ErrorKind::NotFound, format!("invalid shard {shard}"))
281        }
282        RuntimeError::Closed => io::Error::new(io::ErrorKind::BrokenPipe, "runtime closed"),
283        RuntimeError::Overloaded => io::Error::new(io::ErrorKind::WouldBlock, "runtime overloaded"),
284        RuntimeError::UnsupportedBackend(msg) => io::Error::new(io::ErrorKind::Unsupported, msg),
285        RuntimeError::IoUringInit(io) => io,
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use futures::executor::block_on;
293    use std::time::Duration;
294
295    fn success_command() -> Command {
296        if cfg!(windows) {
297            let mut cmd = Command::new("cmd");
298            cmd.args(["/C", "exit", "0"]);
299            cmd
300        } else {
301            let mut cmd = Command::new("sh");
302            cmd.args(["-c", "exit 0"]);
303            cmd
304        }
305    }
306
307    #[test]
308    fn command_builder_status_runs() {
309        let rt = spargio::Runtime::builder()
310            .shards(1)
311            .build()
312            .expect("runtime");
313        let status = block_on(async {
314            CommandBuilder::new(if cfg!(windows) { "cmd" } else { "sh" })
315                .args(if cfg!(windows) {
316                    vec!["/C", "exit", "0"]
317                } else {
318                    vec!["-c", "exit 0"]
319                })
320                .status(&rt.handle())
321                .await
322                .expect("status")
323        });
324        assert!(status.success());
325    }
326
327    #[test]
328    fn status_function_runs() {
329        let rt = spargio::Runtime::builder()
330            .shards(1)
331            .build()
332            .expect("runtime");
333        let status = block_on(async {
334            status(&rt.handle(), success_command())
335                .await
336                .expect("status")
337        });
338        assert!(status.success());
339    }
340
341    #[test]
342    fn status_with_options_timeout_fails() {
343        let rt = spargio::Runtime::builder()
344            .shards(1)
345            .build()
346            .expect("runtime");
347        let err = block_on(async {
348            status_with_options(
349                &rt.handle(),
350                if cfg!(windows) {
351                    let mut cmd = Command::new("cmd");
352                    cmd.args(["/C", "ping -n 2 127.0.0.1 > nul"]);
353                    cmd
354                } else {
355                    let mut cmd = Command::new("sh");
356                    cmd.args(["-c", "sleep 0.1"]);
357                    cmd
358                },
359                CommandOptions::default().with_timeout(Duration::from_millis(5)),
360            )
361            .await
362            .expect_err("timeout")
363        });
364        assert_eq!(err.kind(), io::ErrorKind::TimedOut);
365    }
366}