1use crate::{DispatcherError, Formatter, JobId, Pid, RestartInfo, RestartPolicy};
2use chrono::{DateTime, Local, TimeDelta};
3use command_group::{CommandGroup, GroupChild};
4use log::info;
5use serde::{Deserialize, Serialize};
6use std::collections::VecDeque;
7use std::io::{BufRead, BufReader, Read};
8use std::process::{self, Command, Stdio};
9use std::sync::{mpsc, Arc, Mutex};
10use std::thread;
11use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind, Users};
12
13pub struct Runner {
15 pub proc: GroupChild,
16 pub info: ProcInfo,
17 pub restart_info: RestartInfo,
18 pub user_terminated: bool,
20 pub output: Arc<Mutex<OutputBuffer>>,
21}
22
23#[derive(Clone, Serialize, Deserialize, Debug)]
25pub struct ProcInfo {
26 pub job_id: JobId,
27 pub pid: Pid,
28 pub cmd_args: Vec<String>,
29 pub state: ProcStatus,
30 pub start: DateTime<Local>,
31 pub end: Option<DateTime<Local>>,
32 pub cpu: f32,
35 pub memory: u64,
38 pub virtual_memory: u64,
41 pub total_written_bytes: u64,
44 pub written_bytes: u64,
46 pub total_read_bytes: u64,
48 pub read_bytes: u64,
50}
51
52pub struct JobSpawnInfo {
54 pub job_id: JobId,
55 pub args: Vec<String>,
56 pub restart_info: RestartInfo,
57}
58
59impl ProcInfo {
60 pub fn program(&self) -> &str {
61 self.cmd_args.first().map(|s| s.as_str()).unwrap_or("")
62 }
63}
64
65#[derive(Clone, Serialize, Deserialize, Debug)]
66pub enum ProcStatus {
67 Spawned,
68 Running,
69 ExitOk,
70 ExitErr(i32),
71 Unknown(String),
72}
73
74impl ProcStatus {
75 pub fn exited(&self) -> bool {
76 matches!(self, ProcStatus::ExitOk | ProcStatus::ExitErr(_))
77 }
78}
79
80#[derive(Clone, Serialize, Deserialize, Debug)]
82pub struct LogLine {
83 pub ts: DateTime<Local>,
84 pub job_id: JobId,
85 pub pid: Pid,
86 pub line: String,
87 pub is_stderr: bool,
88}
89
90impl LogLine {
91 pub fn log(&self, formatter: &Formatter) {
92 let dt = self.ts.format("%F %T%.3f");
93 let job_id = self.job_id;
94 let pid = self.pid;
95 let line = &self.line;
96 let color = formatter.log_color_proc(job_id as usize, self.is_stderr);
97 println!("{color}{dt} [{job_id}|{pid}] {line}{color:#}");
98 }
99}
100
101pub struct OutputBuffer {
103 lines: VecDeque<LogLine>,
104 max_len: Option<usize>,
105}
106
107impl OutputBuffer {
108 pub fn new(max_len: Option<usize>) -> Self {
109 OutputBuffer {
110 max_len,
111 lines: VecDeque::new(),
112 }
113 }
114 pub fn push(&mut self, line: LogLine) {
115 self.lines.push_back(line);
116 if let Some(max_len) = self.max_len {
117 if self.lines.len() > max_len {
118 let _ = self.lines.pop_front();
119 }
120 }
121 }
122 pub fn lines_since(&self, last_seen: &mut DateTime<Local>) -> impl Iterator<Item = &LogLine> {
123 let ts = *last_seen;
124 if let Some(entry) = self.lines.back() {
125 *last_seen = entry.ts;
126 }
127 self.lines.iter().skip_while(move |entry| ts >= entry.ts)
128 }
129}
130
131impl Runner {
132 pub fn spawn(
133 job_id: JobId,
134 args: &[String],
135 restart_info: RestartInfo,
136 channel: mpsc::Sender<Pid>,
137 ) -> Result<Self, DispatcherError> {
138 let cmd_args = args.to_vec();
139 let mut cmd = VecDeque::from(args.to_owned());
140 let Some(exe) = cmd.pop_front() else {
141 return Err(DispatcherError::EmptyProcCommandError);
142 };
143 let mut child = Command::new(exe)
146 .args(cmd)
147 .stdin(Stdio::piped())
148 .stdout(Stdio::piped())
149 .stderr(Stdio::piped())
150 .group_spawn()
152 .map_err(|err| DispatcherError::ProcSpawnError(cmd_args.join(" "), err))?;
153 let pid = child.id();
154
155 let max_len = 200; let output = Arc::new(Mutex::new(OutputBuffer::new(Some(max_len))));
158
159 let buffer = output.clone();
160 let stdout = child.inner().stdout.take().unwrap();
161 let _stdout_handle = thread::spawn(move || {
162 output_listener(
163 BufReader::new(stdout),
164 job_id,
165 pid,
166 false,
167 buffer,
168 Some(channel),
169 )
170 });
171
172 let buffer = output.clone();
173 let stderr = child.inner().stderr.take().unwrap();
174 let _stderr_handle = thread::spawn(move || {
175 output_listener(BufReader::new(stderr), job_id, pid, true, buffer, None)
176 });
177
178 let info = ProcInfo {
179 job_id,
180 pid,
181 cmd_args,
182 state: ProcStatus::Spawned,
183 start: Local::now(),
184 end: None,
185 cpu: 0.0,
186 memory: 0,
187 virtual_memory: 0,
188 total_written_bytes: 0,
189 written_bytes: 0,
190 total_read_bytes: 0,
191 read_bytes: 0,
192 };
193
194 let child_proc = Runner {
195 proc: child,
196 info,
197 restart_info,
198 user_terminated: false,
199 output,
200 };
201 Ok(child_proc)
202 }
203 pub fn update_proc_state(&mut self) -> &ProcInfo {
204 if self.info.end.is_none() {
205 self.info.state = match self.proc.try_wait() {
206 Ok(Some(status)) if status.success() => ProcStatus::ExitOk,
207 Ok(Some(status)) => ProcStatus::ExitErr(status.code().unwrap_or(0)),
208 Ok(None) => ProcStatus::Running,
209 Err(e) => ProcStatus::Unknown(e.to_string()),
210 };
211 }
212 &self.info
213 }
214 pub fn is_running(&mut self) -> bool {
215 !self.update_proc_state().state.exited()
216 }
217 pub fn terminate(&mut self) -> Result<(), std::io::Error> {
218 info!("Terminating process {}", self.proc.id());
219 self.proc.kill()?;
220 Ok(())
221 }
222 pub fn restart_infos(&mut self) -> Option<JobSpawnInfo> {
223 let respawn = !self.user_terminated
224 && match self.restart_info.policy {
225 RestartPolicy::Always => true,
226 RestartPolicy::OnFailure => {
227 matches!(self.info.state, ProcStatus::ExitErr(code) if code > 0)
228 }
229 RestartPolicy::Never => false,
230 };
231 if respawn {
232 let last_duration = self.info.end.unwrap_or(Local::now()) - self.info.start;
233 let mut restart_info = self.restart_info.clone();
234 if last_duration > TimeDelta::milliseconds(50) {
235 restart_info.wait_time = 50;
237 } else {
238 restart_info.wait_time *= 2;
239 }
240 Some(JobSpawnInfo {
241 job_id: self.info.job_id,
242 args: self.info.cmd_args.clone(),
243 restart_info,
244 })
245 } else {
246 None
247 }
248 }
249}
250
251impl Drop for Runner {
252 fn drop(&mut self) {
253 self.terminate().ok();
254 }
255}
256
257fn output_listener<R: Read>(
258 reader: BufReader<R>,
259 job_id: JobId,
260 pid: Pid,
261 is_stderr: bool,
262 buffer: Arc<Mutex<OutputBuffer>>,
263 channel: Option<mpsc::Sender<Pid>>,
264) {
265 reader.lines().map_while(Result::ok).for_each(|line| {
266 let ts = Local::now();
267 if is_stderr {
268 eprintln!("[{job_id}|{pid}] {line}");
269 } else {
270 println!("[[{job_id}|{pid}] {line}");
271 }
272 if let Ok(mut buffer) = buffer.lock() {
273 let entry = LogLine {
274 ts,
275 job_id,
276 pid,
277 is_stderr,
278 line,
279 };
280 buffer.push(entry);
281 }
282 });
283 if let Some(channel) = channel {
284 let ts = Local::now();
285 if let Ok(mut buffer) = buffer.lock() {
286 let entry = LogLine {
287 ts,
288 job_id,
289 pid,
290 is_stderr,
291 line: "<process terminated>".to_string(),
292 };
293 buffer.push(entry);
294 }
295 channel.send(pid).unwrap();
297 }
298}
299
300pub fn get_user_name() -> Option<String> {
302 let system = System::new_with_specifics(
303 RefreshKind::nothing()
304 .with_processes(ProcessRefreshKind::nothing().with_user(UpdateKind::OnlyIfNotSet)),
305 );
306 let users = Users::new_with_refreshed_list();
307 let pid = process::id();
308 system
309 .process(sysinfo::Pid::from_u32(pid))
310 .and_then(|proc| {
311 proc.effective_user_id()
312 .or(proc.user_id())
313 .and_then(|uid| users.get_user_by_id(uid))
314 .map(|user| user.name().to_string())
315 })
316}