tutti_core/process_manager/implementations/
unix.rs1use std::time::Duration;
2
3use futures::StreamExt;
4use libc::{killpg, setsid, SIGINT, SIGKILL};
5use tokio::{
6 io::BufReader,
7 process::{Child, Command},
8 time::{sleep, Instant},
9};
10use tokio_util::io::ReaderStream;
11
12use crate::{
13 error::{Error, Result},
14 process_manager::{
15 base::ProcessManager,
16 types::{CommandSpec, ProcId, Spawned},
17 },
18};
19
20#[derive(Debug)]
21struct ChildRec {
22 child: Child,
23 pgid: libc::pid_t,
24}
25
26#[derive(Debug)]
28pub struct UnixProcessManager {
29 processes: Vec<Option<ChildRec>>,
31}
32
33impl Default for UnixProcessManager {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl UnixProcessManager {
40 #[must_use]
41 pub fn new() -> Self {
42 Self {
43 processes: Vec::new(),
44 }
45 }
46}
47
48#[async_trait::async_trait]
49impl ProcessManager for UnixProcessManager {
50 async fn spawn(&mut self, spec: CommandSpec) -> Result<Spawned> {
51 let mut cmd = Command::new(&spec.cmd[0]);
52 if spec.cmd.len() > 1 {
53 cmd.args(&spec.cmd[1..]);
54 }
55 if let Some(dir) = &spec.cwd {
56 cmd.current_dir(dir);
57 }
58 for (k, v) in &spec.env {
59 cmd.env(k, v);
60 }
61
62 #[allow(unsafe_code)]
63 unsafe {
64 cmd.pre_exec(|| {
65 if setsid() == -1 {
66 return Err(std::io::Error::last_os_error());
67 }
68 Ok(())
69 });
70 }
71
72 cmd.stdout(std::process::Stdio::piped())
73 .stderr(std::process::Stdio::piped());
74
75 let mut child = cmd.spawn().map_err(Error::IO)?;
76
77 let pid = child.id();
78
79 let stdout = child
80 .stdout
81 .take()
82 .ok_or_else(|| Error::IO(std::io::Error::other("stdout not piped")))?;
83 let stderr = child
84 .stderr
85 .take()
86 .ok_or_else(|| Error::IO(std::io::Error::other("stdout not piped")))?;
87
88 let out_stream = ReaderStream::new(BufReader::new(stdout))
89 .filter_map(|res| async move { res.ok().map(|b| b.to_vec()) });
90 let err_stream = ReaderStream::new(BufReader::new(stderr))
91 .filter_map(|res| async move { res.ok().map(|b| b.to_vec()) });
92
93 let id = ProcId(self.processes.len() as u64);
94 self.processes.push(Some(ChildRec {
95 child,
96 pgid: libc::pid_t::try_from(
97 pid.ok_or_else(|| Error::IO(std::io::Error::other("pid not available")))?,
98 )
99 .map_err(|_| Error::IO(std::io::Error::other("pid not available")))?,
100 }));
101
102 Ok(Spawned {
103 id,
104 pid,
105 stdout: Box::pin(out_stream),
106 stderr: Box::pin(err_stream),
107 })
108 }
109
110 async fn shutdown(&mut self, id: ProcId) -> Result<()> {
111 let proc = self
112 .processes
113 .get(usize::try_from(id.0).map_err(|_| {
114 Error::IO(std::io::Error::other("Cannot convert process id to usize"))
115 })?)
116 .ok_or_else(|| Error::IO(std::io::Error::other("unknown process id {id:?}")))?
117 .as_ref()
118 .ok_or_else(|| {
119 Error::IO(std::io::Error::other("already shutdown process id {id:?}"))
120 })?;
121
122 #[allow(unsafe_code)]
123 unsafe {
124 let rc = killpg(proc.pgid, SIGINT);
125 if rc == -1 {
126 return Err(Error::IO(std::io::Error::last_os_error()));
127 }
128 }
129
130 Ok(())
131 }
132
133 async fn wait(&mut self, id: ProcId, d: Duration) -> Result<Option<i32>> {
134 let index = usize::try_from(id.0)
135 .map_err(|_| Error::IO(std::io::Error::other("Cannot convert process id to usize")))?;
136 let proc = self
137 .processes
138 .get_mut(index)
139 .ok_or_else(|| Error::IO(std::io::Error::other("unknown process id {id:?}")))?
140 .as_mut()
141 .ok_or_else(|| {
142 Error::IO(std::io::Error::other("already shutdown process id {id:?}"))
143 })?;
144
145 let start = Instant::now();
146 loop {
147 if let Ok(Some(code)) = proc.child.try_wait() {
148 self.processes[index] = None;
149 return Ok(Some(code.code().unwrap_or_default()));
150 }
151
152 if start.elapsed() >= d {
153 return Ok(None);
154 }
155 sleep(Duration::from_millis(50)).await;
156 }
157 }
158
159 async fn kill(&mut self, id: ProcId) -> Result<()> {
160 let proc = self
161 .processes
162 .get(usize::try_from(id.0).map_err(|_| {
163 Error::IO(std::io::Error::other("Cannot convert process id to usize"))
164 })?)
165 .ok_or_else(|| Error::IO(std::io::Error::other("unknown process id {id:?}")))?
166 .as_ref()
167 .ok_or_else(|| {
168 Error::IO(std::io::Error::other("already shutdown process id {id:?}"))
169 })?;
170
171 #[allow(unsafe_code)]
172 unsafe {
173 let rc = killpg(proc.pgid, SIGKILL);
174 if rc == -1 {
175 return Err(Error::IO(std::io::Error::last_os_error()));
176 }
177 }
178
179 Ok(())
180 }
181}