tutti_core/process/
unix.rs1use std::time::Duration;
2
3use futures::StreamExt;
4use libc::{killpg, setsid, SIGINT, SIGKILL};
5use tokio::{
6 io::BufReader,
7 process::{Child, Command},
8 time::{sleep, Instant},
9};
10use tokio_util::io::ReaderStream;
11
12use super::{CommandSpec, ProcId, ProcessManager, Spawned};
13
14#[derive(Debug)]
15struct ChildRec {
16 child: Child,
17 pgid: libc::pid_t,
18}
19
20#[derive(Debug)]
22pub struct UnixProcessManager {
23 processes: Vec<Option<ChildRec>>,
25}
26
27impl Default for UnixProcessManager {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl UnixProcessManager {
34 #[must_use]
35 pub fn new() -> Self {
36 Self {
37 processes: Vec::new(),
38 }
39 }
40}
41
42#[async_trait::async_trait]
43impl ProcessManager for UnixProcessManager {
44 async fn spawn(&mut self, spec: CommandSpec) -> anyhow::Result<Spawned> {
45 anyhow::ensure!(
46 !spec.cmd.is_empty(),
47 "empty cmd for service `{}`",
48 spec.name
49 );
50
51 let mut cmd = Command::new(&spec.cmd[0]);
52 if spec.cmd.len() > 1 {
53 cmd.args(&spec.cmd[1..]);
54 }
55 if let Some(dir) = &spec.cwd {
56 cmd.current_dir(dir);
57 }
58 for (k, v) in &spec.env {
59 cmd.env(k, v);
60 }
61
62 #[allow(unsafe_code)]
63 unsafe {
64 cmd.pre_exec(|| {
65 if setsid() == -1 {
66 return Err(std::io::Error::last_os_error());
67 }
68 Ok(())
69 });
70 }
71
72 cmd.stdout(std::process::Stdio::piped())
73 .stderr(std::process::Stdio::piped());
74
75 let mut child = cmd.spawn()?;
76
77 let pid = child.id();
78
79 let stdout = child
80 .stdout
81 .take()
82 .ok_or_else(|| anyhow::anyhow!("stdout not piped"))?;
83 let stderr = child
84 .stderr
85 .take()
86 .ok_or_else(|| anyhow::anyhow!("stderr not piped"))?;
87
88 let out_stream = ReaderStream::new(BufReader::new(stdout))
89 .filter_map(|res| async move { res.ok().map(|b| b.to_vec()) });
90 let err_stream = ReaderStream::new(BufReader::new(stderr))
91 .filter_map(|res| async move { res.ok().map(|b| b.to_vec()) });
92
93 let id = ProcId(self.processes.len() as u64);
94 self.processes.push(Some(ChildRec {
95 child,
96 pgid: libc::pid_t::try_from(
97 pid.ok_or_else(|| anyhow::anyhow!("spawned process has no pid"))?,
98 )?,
99 }));
100
101 Ok(Spawned {
102 id,
103 pid,
104 stdout: Box::pin(out_stream),
105 stderr: Box::pin(err_stream),
106 })
107 }
108
109 async fn shutdown(&mut self, id: ProcId) -> anyhow::Result<()> {
110 let proc = self
111 .processes
112 .get(usize::try_from(id.0)?)
113 .ok_or_else(|| anyhow::anyhow!("unknown process id {id:?}"))?
114 .as_ref()
115 .ok_or_else(|| anyhow::anyhow!("already shutdown process id {id:?}"))?;
116
117 #[allow(unsafe_code)]
118 unsafe {
119 let rc = killpg(proc.pgid, SIGINT);
120 if rc == -1 {
121 return Err(std::io::Error::last_os_error().into());
122 }
123 }
124
125 Ok(())
126 }
127
128 async fn wait(&mut self, id: ProcId, d: Duration) -> anyhow::Result<Option<i32>> {
129 let index = usize::try_from(id.0)?;
130 let proc = self
131 .processes
132 .get_mut(index)
133 .ok_or_else(|| anyhow::anyhow!("unknown process id {id:?}"))?
134 .as_mut()
135 .ok_or_else(|| anyhow::anyhow!("already shutdown process id {id:?}"))?;
136 let start = Instant::now();
137 loop {
138 if let Ok(Some(code)) = proc.child.try_wait() {
139 self.processes[index] = None;
140 return Ok(Some(code.code().unwrap_or_default()));
141 }
142
143 if start.elapsed() >= d {
144 return Ok(None);
145 }
146 sleep(Duration::from_millis(50)).await;
147 }
148 }
149
150 async fn kill(&mut self, id: ProcId) -> anyhow::Result<()> {
151 let proc = self
152 .processes
153 .get(usize::try_from(id.0)?)
154 .ok_or_else(|| anyhow::anyhow!("unknown process id {id:?}"))?
155 .as_ref()
156 .ok_or_else(|| anyhow::anyhow!("already shutdown process id {id:?}"))?;
157
158 #[allow(unsafe_code)]
159 unsafe {
160 let rc = killpg(proc.pgid, SIGKILL);
161 if rc == -1 {
162 return Err(std::io::Error::last_os_error().into());
163 }
164 }
165
166 let _ = self.wait(id, Duration::from_millis(10)).await;
167 Ok(())
168 }
169}