tutti_core/process_manager/implementations/
unix.rs

1use std::time::Duration;
2
3use futures::StreamExt;
4use libc::{killpg, setsid, SIGINT, SIGKILL};
5use tokio::{
6    io::BufReader,
7    process::{Child, Command},
8    time::{sleep, Instant},
9};
10use tokio_util::io::ReaderStream;
11
12use crate::{
13    error::{Error, Result},
14    process_manager::{
15        base::ProcessManager,
16        types::{CommandSpec, ProcId, Spawned},
17    },
18};
19
20#[derive(Debug)]
21struct ChildRec {
22    child: Child,
23    pgid: libc::pid_t,
24}
25
26/// Unix-specific process manager.
27#[derive(Debug)]
28pub struct UnixProcessManager {
29    // TODO: refactor
30    processes: Vec<Option<ChildRec>>,
31}
32
33impl Default for UnixProcessManager {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl UnixProcessManager {
40    #[must_use]
41    pub fn new() -> Self {
42        Self {
43            processes: Vec::new(),
44        }
45    }
46}
47
48#[async_trait::async_trait]
49impl ProcessManager for UnixProcessManager {
50    async fn spawn(&mut self, spec: CommandSpec) -> Result<Spawned> {
51        let mut cmd = Command::new(&spec.cmd[0]);
52        if spec.cmd.len() > 1 {
53            cmd.args(&spec.cmd[1..]);
54        }
55        if let Some(dir) = &spec.cwd {
56            cmd.current_dir(dir);
57        }
58        for (k, v) in &spec.env {
59            cmd.env(k, v);
60        }
61
62        #[allow(unsafe_code)]
63        unsafe {
64            cmd.pre_exec(|| {
65                if setsid() == -1 {
66                    return Err(std::io::Error::last_os_error());
67                }
68                Ok(())
69            });
70        }
71
72        cmd.stdout(std::process::Stdio::piped())
73            .stderr(std::process::Stdio::piped());
74
75        let mut child = cmd.spawn().map_err(Error::IO)?;
76
77        let pid = child.id();
78
79        let stdout = child
80            .stdout
81            .take()
82            .ok_or_else(|| Error::IO(std::io::Error::other("stdout not piped")))?;
83        let stderr = child
84            .stderr
85            .take()
86            .ok_or_else(|| Error::IO(std::io::Error::other("stdout not piped")))?;
87
88        let out_stream = ReaderStream::new(BufReader::new(stdout))
89            .filter_map(|res| async move { res.ok().map(|b| b.to_vec()) });
90        let err_stream = ReaderStream::new(BufReader::new(stderr))
91            .filter_map(|res| async move { res.ok().map(|b| b.to_vec()) });
92
93        let id = ProcId(self.processes.len() as u64);
94        self.processes.push(Some(ChildRec {
95            child,
96            pgid: libc::pid_t::try_from(
97                pid.ok_or_else(|| Error::IO(std::io::Error::other("pid not available")))?,
98            )
99            .map_err(|_| Error::IO(std::io::Error::other("pid not available")))?,
100        }));
101
102        Ok(Spawned {
103            id,
104            pid,
105            stdout: Box::pin(out_stream),
106            stderr: Box::pin(err_stream),
107        })
108    }
109
110    async fn shutdown(&mut self, id: ProcId) -> Result<()> {
111        let proc = self
112            .processes
113            .get(usize::try_from(id.0).map_err(|_| {
114                Error::IO(std::io::Error::other("Cannot convert process id to usize"))
115            })?)
116            .ok_or_else(|| Error::IO(std::io::Error::other("unknown process id {id:?}")))?
117            .as_ref()
118            .ok_or_else(|| {
119                Error::IO(std::io::Error::other("already shutdown process id {id:?}"))
120            })?;
121
122        #[allow(unsafe_code)]
123        unsafe {
124            let rc = killpg(proc.pgid, SIGINT);
125            if rc == -1 {
126                return Err(Error::IO(std::io::Error::last_os_error()));
127            }
128        }
129
130        Ok(())
131    }
132
133    async fn wait(&mut self, id: ProcId, d: Duration) -> Result<Option<i32>> {
134        let index = usize::try_from(id.0)
135            .map_err(|_| Error::IO(std::io::Error::other("Cannot convert process id to usize")))?;
136        let proc = self
137            .processes
138            .get_mut(index)
139            .ok_or_else(|| Error::IO(std::io::Error::other("unknown process id {id:?}")))?
140            .as_mut()
141            .ok_or_else(|| {
142                Error::IO(std::io::Error::other("already shutdown process id {id:?}"))
143            })?;
144
145        let start = Instant::now();
146        loop {
147            if let Ok(Some(code)) = proc.child.try_wait() {
148                self.processes[index] = None;
149                return Ok(Some(code.code().unwrap_or_default()));
150            }
151
152            if start.elapsed() >= d {
153                return Ok(None);
154            }
155            sleep(Duration::from_millis(50)).await;
156        }
157    }
158
159    async fn kill(&mut self, id: ProcId) -> Result<()> {
160        let proc = self
161            .processes
162            .get(usize::try_from(id.0).map_err(|_| {
163                Error::IO(std::io::Error::other("Cannot convert process id to usize"))
164            })?)
165            .ok_or_else(|| Error::IO(std::io::Error::other("unknown process id {id:?}")))?
166            .as_ref()
167            .ok_or_else(|| {
168                Error::IO(std::io::Error::other("already shutdown process id {id:?}"))
169            })?;
170
171        #[allow(unsafe_code)]
172        unsafe {
173            let rc = killpg(proc.pgid, SIGKILL);
174            if rc == -1 {
175                return Err(Error::IO(std::io::Error::last_os_error()));
176            }
177        }
178
179        Ok(())
180    }
181}