1use crate::{DispatcherError, Formatter, JobId, Pid, RestartInfo};
2use chrono::{DateTime, Local};
3use command_group::{CommandGroup, GroupChild};
4use log::info;
5use serde::{Deserialize, Serialize};
6use std::collections::VecDeque;
7use std::io::{BufRead, BufReader, Read};
8use std::process::{self, Command, Stdio};
9use std::sync::{mpsc, Arc, Mutex};
10use std::thread;
11use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind, Users};
12
13pub struct Runner {
15 pub proc: GroupChild,
16 pub info: ProcInfo,
17 pub restart_info: RestartInfo,
18 pub user_terminated: bool,
20 pub output: Arc<Mutex<OutputBuffer>>,
21}
22
23#[derive(Clone, Serialize, Deserialize, Debug)]
25pub struct ProcInfo {
26 pub job_id: JobId,
27 pub pid: Pid,
28 pub cmd_args: Vec<String>,
29 pub state: ProcStatus,
30 pub start: DateTime<Local>,
31 pub end: Option<DateTime<Local>>,
32 pub cpu: f32,
35 pub memory: u64,
38 pub virtual_memory: u64,
41 pub total_written_bytes: u64,
44 pub written_bytes: u64,
46 pub total_read_bytes: u64,
48 pub read_bytes: u64,
50}
51
52impl ProcInfo {
53 pub fn program(&self) -> &str {
54 self.cmd_args.first().map(|s| s.as_str()).unwrap_or("")
55 }
56}
57
58#[derive(Clone, Serialize, Deserialize, Debug)]
59pub enum ProcStatus {
60 Spawned,
61 Running,
62 ExitOk,
63 ExitErr(i32),
64 Unknown(String),
65}
66
67impl ProcStatus {
68 pub fn exited(&self) -> bool {
69 matches!(self, ProcStatus::ExitOk | ProcStatus::ExitErr(_))
70 }
71}
72
73#[derive(Clone, Serialize, Deserialize, Debug)]
75pub struct LogLine {
76 pub ts: DateTime<Local>,
77 pub job_id: JobId,
78 pub pid: Pid,
79 pub line: String,
80 pub is_stderr: bool,
81}
82
83impl LogLine {
84 pub fn log(&self, formatter: &Formatter) {
85 let dt = self.ts.format("%F %T%.3f");
86 let job_id = self.job_id;
87 let pid = self.pid;
88 let line = &self.line;
89 let color = formatter.log_color_proc(job_id as usize, self.is_stderr);
90 println!("{color}{dt} [{job_id}|{pid}] {line}{color:#}");
91 }
92}
93
94pub struct OutputBuffer {
96 lines: VecDeque<LogLine>,
97 max_len: Option<usize>,
98}
99
100impl OutputBuffer {
101 pub fn new(max_len: Option<usize>) -> Self {
102 OutputBuffer {
103 max_len,
104 lines: VecDeque::new(),
105 }
106 }
107 pub fn push(&mut self, line: LogLine) {
108 self.lines.push_back(line);
109 if let Some(max_len) = self.max_len {
110 if self.lines.len() > max_len {
111 let _ = self.lines.pop_front();
112 }
113 }
114 }
115 pub fn lines_since(&self, last_seen: &mut DateTime<Local>) -> impl Iterator<Item = &LogLine> {
116 let ts = *last_seen;
117 if let Some(entry) = self.lines.back() {
118 *last_seen = entry.ts;
119 }
120 self.lines.iter().skip_while(move |entry| ts >= entry.ts)
121 }
122}
123
124impl Runner {
125 pub fn spawn(
126 job_id: JobId,
127 args: &[String],
128 restart_info: RestartInfo,
129 channel: mpsc::Sender<Pid>,
130 ) -> Result<Self, DispatcherError> {
131 let cmd_args = args.to_vec();
132 let mut cmd = VecDeque::from(args.to_owned());
133 let Some(exe) = cmd.pop_front() else {
134 return Err(DispatcherError::EmptyProcCommandError);
135 };
136 let mut child = Command::new(exe)
139 .args(cmd)
140 .stdin(Stdio::piped())
141 .stdout(Stdio::piped())
142 .stderr(Stdio::piped())
143 .group_spawn()
145 .map_err(DispatcherError::ProcSpawnError)?;
146 let pid = child.id();
147
148 let max_len = 200; let output = Arc::new(Mutex::new(OutputBuffer::new(Some(max_len))));
151
152 let buffer = output.clone();
153 let stdout = child.inner().stdout.take().unwrap();
154 let _stdout_handle = thread::spawn(move || {
155 output_listener(
156 BufReader::new(stdout),
157 job_id,
158 pid,
159 false,
160 buffer,
161 Some(channel),
162 )
163 });
164
165 let buffer = output.clone();
166 let stderr = child.inner().stderr.take().unwrap();
167 let _stderr_handle = thread::spawn(move || {
168 output_listener(BufReader::new(stderr), job_id, pid, true, buffer, None)
169 });
170
171 let info = ProcInfo {
172 job_id,
173 pid,
174 cmd_args,
175 state: ProcStatus::Spawned,
176 start: Local::now(),
177 end: None,
178 cpu: 0.0,
179 memory: 0,
180 virtual_memory: 0,
181 total_written_bytes: 0,
182 written_bytes: 0,
183 total_read_bytes: 0,
184 read_bytes: 0,
185 };
186
187 let child_proc = Runner {
188 proc: child,
189 info,
190 restart_info,
191 user_terminated: false,
192 output,
193 };
194 Ok(child_proc)
195 }
196 pub fn update_proc_state(&mut self) -> &ProcInfo {
197 if self.info.end.is_none() {
198 self.info.state = match self.proc.try_wait() {
199 Ok(Some(status)) if status.success() => ProcStatus::ExitOk,
200 Ok(Some(status)) => ProcStatus::ExitErr(status.code().unwrap_or(0)),
201 Ok(None) => ProcStatus::Running,
202 Err(e) => ProcStatus::Unknown(e.to_string()),
203 };
204 }
205 &self.info
206 }
207 pub fn is_running(&mut self) -> bool {
208 !self.update_proc_state().state.exited()
209 }
210 pub fn terminate(&mut self) -> Result<(), std::io::Error> {
211 info!("Terminating process {}", self.proc.id());
212 self.proc.kill()?;
213 Ok(())
214 }
215}
216
217impl Drop for Runner {
218 fn drop(&mut self) {
219 self.terminate().ok();
220 }
221}
222
223fn output_listener<R: Read>(
224 reader: BufReader<R>,
225 job_id: JobId,
226 pid: Pid,
227 is_stderr: bool,
228 buffer: Arc<Mutex<OutputBuffer>>,
229 channel: Option<mpsc::Sender<Pid>>,
230) {
231 reader.lines().map_while(Result::ok).for_each(|line| {
232 let ts = Local::now();
233 if is_stderr {
234 eprintln!("[{job_id}|{pid}] {line}");
235 } else {
236 println!("[[{job_id}|{pid}] {line}");
237 }
238 if let Ok(mut buffer) = buffer.lock() {
239 let entry = LogLine {
240 ts,
241 job_id,
242 pid,
243 is_stderr,
244 line,
245 };
246 buffer.push(entry);
247 }
248 });
249 if let Some(channel) = channel {
250 let ts = Local::now();
251 if let Ok(mut buffer) = buffer.lock() {
252 let entry = LogLine {
253 ts,
254 job_id,
255 pid,
256 is_stderr,
257 line: "<process terminated>".to_string(),
258 };
259 buffer.push(entry);
260 }
261 channel.send(pid).unwrap();
263 }
264}
265
266pub fn get_user_name() -> Option<String> {
268 let system = System::new_with_specifics(
269 RefreshKind::new()
270 .with_processes(ProcessRefreshKind::new().with_user(UpdateKind::OnlyIfNotSet)),
271 );
272 let users = Users::new_with_refreshed_list();
273 let pid = process::id();
274 system
275 .process(sysinfo::Pid::from_u32(pid))
276 .and_then(|proc| {
277 proc.effective_user_id()
278 .or(proc.user_id())
279 .and_then(|uid| users.get_user_by_id(uid))
280 .map(|user| user.name().to_string())
281 })
282}