tutti_core/process/
unix.rs

1use std::time::Duration;
2
3use futures::StreamExt;
4use libc::{killpg, setsid, SIGINT, SIGKILL};
5use tokio::{
6    io::BufReader,
7    process::{Child, Command},
8    time::{sleep, Instant},
9};
10use tokio_util::io::ReaderStream;
11
12use super::{CommandSpec, ProcId, ProcessManager, Spawned};
13
14#[derive(Debug)]
15struct ChildRec {
16    child: Child,
17    pgid: libc::pid_t,
18}
19
20/// Unix-specific process manager.
21#[derive(Debug)]
22pub struct UnixProcessManager {
23    // TODO: refactor
24    processes: Vec<Option<ChildRec>>,
25}
26
27impl Default for UnixProcessManager {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl UnixProcessManager {
34    #[must_use]
35    pub fn new() -> Self {
36        Self {
37            processes: Vec::new(),
38        }
39    }
40}
41
42#[async_trait::async_trait]
43impl ProcessManager for UnixProcessManager {
44    async fn spawn(&mut self, spec: CommandSpec) -> anyhow::Result<Spawned> {
45        anyhow::ensure!(
46            !spec.cmd.is_empty(),
47            "empty cmd for service `{}`",
48            spec.name
49        );
50
51        let mut cmd = Command::new(&spec.cmd[0]);
52        if spec.cmd.len() > 1 {
53            cmd.args(&spec.cmd[1..]);
54        }
55        if let Some(dir) = &spec.cwd {
56            cmd.current_dir(dir);
57        }
58        for (k, v) in &spec.env {
59            cmd.env(k, v);
60        }
61
62        #[allow(unsafe_code)]
63        unsafe {
64            cmd.pre_exec(|| {
65                if setsid() == -1 {
66                    return Err(std::io::Error::last_os_error());
67                }
68                Ok(())
69            });
70        }
71
72        cmd.stdout(std::process::Stdio::piped())
73            .stderr(std::process::Stdio::piped());
74
75        let mut child = cmd.spawn()?;
76
77        let pid = child.id();
78
79        let stdout = child
80            .stdout
81            .take()
82            .ok_or_else(|| anyhow::anyhow!("stdout not piped"))?;
83        let stderr = child
84            .stderr
85            .take()
86            .ok_or_else(|| anyhow::anyhow!("stderr not piped"))?;
87
88        let out_stream = ReaderStream::new(BufReader::new(stdout))
89            .filter_map(|res| async move { res.ok().map(|b| b.to_vec()) });
90        let err_stream = ReaderStream::new(BufReader::new(stderr))
91            .filter_map(|res| async move { res.ok().map(|b| b.to_vec()) });
92
93        let id = ProcId(self.processes.len() as u64);
94        self.processes.push(Some(ChildRec {
95            child,
96            pgid: libc::pid_t::try_from(
97                pid.ok_or_else(|| anyhow::anyhow!("spawned process has no pid"))?,
98            )?,
99        }));
100
101        Ok(Spawned {
102            id,
103            pid,
104            stdout: Box::pin(out_stream),
105            stderr: Box::pin(err_stream),
106        })
107    }
108
109    async fn shutdown(&mut self, id: ProcId) -> anyhow::Result<()> {
110        let proc = self
111            .processes
112            .get(usize::try_from(id.0)?)
113            .ok_or_else(|| anyhow::anyhow!("unknown process id {id:?}"))?
114            .as_ref()
115            .ok_or_else(|| anyhow::anyhow!("already shutdown process id {id:?}"))?;
116
117        #[allow(unsafe_code)]
118        unsafe {
119            let rc = killpg(proc.pgid, SIGINT);
120            if rc == -1 {
121                return Err(std::io::Error::last_os_error().into());
122            }
123        }
124
125        Ok(())
126    }
127
128    async fn wait(&mut self, id: ProcId, d: Duration) -> anyhow::Result<Option<i32>> {
129        let index = usize::try_from(id.0)?;
130        let proc = self
131            .processes
132            .get_mut(index)
133            .ok_or_else(|| anyhow::anyhow!("unknown process id {id:?}"))?
134            .as_mut()
135            .ok_or_else(|| anyhow::anyhow!("already shutdown process id {id:?}"))?;
136        let start = Instant::now();
137        loop {
138            if let Ok(Some(code)) = proc.child.try_wait() {
139                self.processes[index] = None;
140                return Ok(Some(code.code().unwrap_or_default()));
141            }
142
143            if start.elapsed() >= d {
144                return Ok(None);
145            }
146            sleep(Duration::from_millis(50)).await;
147        }
148    }
149
150    async fn kill(&mut self, id: ProcId) -> anyhow::Result<()> {
151        let proc = self
152            .processes
153            .get(usize::try_from(id.0)?)
154            .ok_or_else(|| anyhow::anyhow!("unknown process id {id:?}"))?
155            .as_ref()
156            .ok_or_else(|| anyhow::anyhow!("already shutdown process id {id:?}"))?;
157
158        #[allow(unsafe_code)]
159        unsafe {
160            let rc = killpg(proc.pgid, SIGKILL);
161            if rc == -1 {
162                return Err(std::io::Error::last_os_error().into());
163            }
164        }
165
166        let _ = self.wait(id, Duration::from_millis(10)).await;
167        Ok(())
168    }
169}