1use crate::{DispatcherError, Formatter, JobId, Pid, RestartInfo};
2use chrono::{DateTime, Local};
3use log::info;
4use serde::{Deserialize, Serialize};
5use std::collections::VecDeque;
6use std::io::{BufRead, BufReader, Read};
7use std::process::{self, Child, Command, Stdio};
8use std::sync::{mpsc, Arc, Mutex};
9use std::thread;
10use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind, Users};
11
12pub struct Runner {
14 pub proc: Child,
15 pub info: ProcInfo,
16 pub restart_info: RestartInfo,
17 pub user_terminated: bool,
19 pub output: Arc<Mutex<OutputBuffer>>,
20}
21
22#[derive(Clone, Serialize, Deserialize, Debug)]
24pub struct ProcInfo {
25 pub job_id: JobId,
26 pub pid: Pid,
27 pub cmd_args: Vec<String>,
28 pub state: ProcStatus,
29 pub start: DateTime<Local>,
30 pub end: Option<DateTime<Local>>,
31 pub cpu: f32,
34 pub memory: u64,
37 pub virtual_memory: u64,
40 pub total_written_bytes: u64,
43 pub written_bytes: u64,
45 pub total_read_bytes: u64,
47 pub read_bytes: u64,
49}
50
51impl ProcInfo {
52 pub fn program(&self) -> &str {
53 self.cmd_args.first().map(|s| s.as_str()).unwrap_or("")
54 }
55}
56
57#[derive(Clone, Serialize, Deserialize, Debug)]
58pub enum ProcStatus {
59 Spawned,
60 Running,
61 ExitOk,
62 ExitErr(i32),
63 Unknown(String),
64}
65
66impl ProcStatus {
67 pub fn exited(&self) -> bool {
68 matches!(self, ProcStatus::ExitOk | ProcStatus::ExitErr(_))
69 }
70}
71
72#[derive(Clone, Serialize, Deserialize, Debug)]
74pub struct LogLine {
75 pub ts: DateTime<Local>,
76 pub job_id: JobId,
77 pub pid: Pid,
78 pub line: String,
79 pub is_stderr: bool,
80}
81
82impl LogLine {
83 pub fn log(&self, formatter: &Formatter) {
84 let dt = self.ts.format("%F %T%.3f");
85 let job_id = self.job_id;
86 let pid = self.pid;
87 let line = &self.line;
88 let color = formatter.log_color_proc(job_id as usize, self.is_stderr);
89 println!("{color}{dt} [{job_id}|{pid}] {line}{color:#}");
90 }
91}
92
93pub struct OutputBuffer {
95 lines: VecDeque<LogLine>,
96 max_len: Option<usize>,
97}
98
99impl OutputBuffer {
100 pub fn new(max_len: Option<usize>) -> Self {
101 OutputBuffer {
102 max_len,
103 lines: VecDeque::new(),
104 }
105 }
106 pub fn push(&mut self, line: LogLine) {
107 self.lines.push_back(line);
108 if let Some(max_len) = self.max_len {
109 if self.lines.len() > max_len {
110 let _ = self.lines.pop_front();
111 }
112 }
113 }
114 pub fn lines_since(&self, last_seen: &mut DateTime<Local>) -> impl Iterator<Item = &LogLine> {
115 let ts = *last_seen;
116 if let Some(entry) = self.lines.back() {
117 *last_seen = entry.ts;
118 }
119 self.lines.iter().skip_while(move |entry| ts >= entry.ts)
120 }
121}
122
123impl Runner {
124 pub fn spawn(
125 job_id: JobId,
126 args: &[String],
127 restart_info: RestartInfo,
128 channel: mpsc::Sender<Pid>,
129 ) -> Result<Self, DispatcherError> {
130 let cmd_args = args.to_vec();
131 let mut cmd = VecDeque::from(args.to_owned());
132 let Some(exe) = cmd.pop_front() else {
133 return Err(DispatcherError::EmptyProcCommandError);
134 };
135 let mut child = Command::new(exe)
138 .args(cmd)
139 .stdin(Stdio::piped())
140 .stdout(Stdio::piped())
141 .stderr(Stdio::piped())
142 .spawn()
143 .map_err(DispatcherError::ProcSpawnError)?;
144 let pid = child.id();
145
146 let max_len = 200; let output = Arc::new(Mutex::new(OutputBuffer::new(Some(max_len))));
149
150 let buffer = output.clone();
151 let stdout = child.stdout.take().unwrap();
152 let _stdout_handle = thread::spawn(move || {
153 output_listener(
154 BufReader::new(stdout),
155 job_id,
156 pid,
157 false,
158 buffer,
159 Some(channel),
160 )
161 });
162
163 let buffer = output.clone();
164 let stderr = child.stderr.take().unwrap();
165 let _stderr_handle = thread::spawn(move || {
166 output_listener(BufReader::new(stderr), job_id, pid, true, buffer, None)
167 });
168
169 let info = ProcInfo {
170 job_id,
171 pid,
172 cmd_args,
173 state: ProcStatus::Spawned,
174 start: Local::now(),
175 end: None,
176 cpu: 0.0,
177 memory: 0,
178 virtual_memory: 0,
179 total_written_bytes: 0,
180 written_bytes: 0,
181 total_read_bytes: 0,
182 read_bytes: 0,
183 };
184
185 let child_proc = Runner {
186 proc: child,
187 info,
188 restart_info,
189 user_terminated: false,
190 output,
191 };
192 Ok(child_proc)
193 }
194 pub fn update_proc_state(&mut self) -> &ProcInfo {
195 if self.info.end.is_none() {
196 self.info.state = match self.proc.try_wait() {
197 Ok(Some(status)) if status.success() => ProcStatus::ExitOk,
198 Ok(Some(status)) => ProcStatus::ExitErr(status.code().unwrap_or(0)),
199 Ok(None) => ProcStatus::Running,
200 Err(e) => ProcStatus::Unknown(e.to_string()),
201 };
202 }
203 &self.info
204 }
205 pub fn is_running(&mut self) -> bool {
206 !self.update_proc_state().state.exited()
207 }
208 pub fn terminate(&mut self) -> Result<(), std::io::Error> {
209 if self.info.program() == "just" {
210 let just_pid = self.proc.id() as usize;
212 let system = System::new_with_specifics(
213 RefreshKind::new().with_processes(ProcessRefreshKind::new()),
214 );
215 if let Some((pid, process)) = system.processes().iter().find(|(_pid, process)| {
216 process.parent().unwrap_or(0.into()) == just_pid.into()
217 && process.name() != "ctrl-c"
218 }) {
219 info!("Terminating process {pid} (parent process {just_pid})");
220 process.kill(); }
222 } else {
226 info!("Terminating process {}", self.proc.id());
227 self.proc.kill()?;
228 }
229 Ok(())
230 }
231}
232
233impl Drop for Runner {
234 fn drop(&mut self) {
235 self.terminate().ok();
236 }
237}
238
239fn output_listener<R: Read>(
240 reader: BufReader<R>,
241 job_id: JobId,
242 pid: Pid,
243 is_stderr: bool,
244 buffer: Arc<Mutex<OutputBuffer>>,
245 channel: Option<mpsc::Sender<Pid>>,
246) {
247 reader.lines().map_while(Result::ok).for_each(|line| {
248 let ts = Local::now();
249 if is_stderr {
250 eprintln!("[{job_id}|{pid}] {line}");
251 } else {
252 println!("[[{job_id}|{pid}] {line}");
253 }
254 if let Ok(mut buffer) = buffer.lock() {
255 let entry = LogLine {
256 ts,
257 job_id,
258 pid,
259 is_stderr,
260 line,
261 };
262 buffer.push(entry);
263 }
264 });
265 if let Some(channel) = channel {
266 let ts = Local::now();
267 if let Ok(mut buffer) = buffer.lock() {
268 let entry = LogLine {
269 ts,
270 job_id,
271 pid,
272 is_stderr,
273 line: "<process terminated>".to_string(),
274 };
275 buffer.push(entry);
276 }
277 channel.send(pid).unwrap();
279 }
280}
281
282pub fn get_user_name() -> Option<String> {
284 let system = System::new_with_specifics(
285 RefreshKind::new()
286 .with_processes(ProcessRefreshKind::new().with_user(UpdateKind::OnlyIfNotSet)),
287 );
288 let users = Users::new_with_refreshed_list();
289 let pid = process::id();
290 system
291 .process(sysinfo::Pid::from_u32(pid))
292 .and_then(|proc| {
293 proc.effective_user_id()
294 .or(proc.user_id())
295 .and_then(|uid| users.get_user_by_id(uid))
296 .map(|user| user.name().to_string())
297 })
298}