shell_compose/
runner.rs

1use crate::{DispatcherError, Formatter, JobId, Pid, RestartInfo, RestartPolicy};
2use chrono::{DateTime, Local, TimeDelta};
3use command_group::{CommandGroup, GroupChild};
4use log::info;
5use serde::{Deserialize, Serialize};
6use std::collections::VecDeque;
7use std::io::{BufRead, BufReader, Read};
8use std::process::{self, Command, Stdio};
9use std::sync::{mpsc, Arc, Mutex};
10use std::thread;
11use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind, Users};
12
13/// Child process controller
14pub struct Runner {
15    pub proc: GroupChild,
16    pub info: ProcInfo,
17    pub restart_info: RestartInfo,
18    /// Flag set in stop/down command to prevent restart
19    pub user_terminated: bool,
20    pub output: Arc<Mutex<OutputBuffer>>,
21}
22
23/// Process information
24#[derive(Clone, Serialize, Deserialize, Debug)]
25pub struct ProcInfo {
26    pub job_id: JobId,
27    pub pid: Pid,
28    pub cmd_args: Vec<String>,
29    pub state: ProcStatus,
30    pub start: DateTime<Local>,
31    pub end: Option<DateTime<Local>>,
32    /// Total CPU usage (in %)
33    /// See <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.cpu_usage>
34    pub cpu: f32,
35    /// Memory usage (in bytes).
36    /// See <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.memory>
37    pub memory: u64,
38    /// Virtual memory usage (in bytes).
39    /// <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.virtual_memory>
40    pub virtual_memory: u64,
41    /// Total number of written bytes.
42    /// <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.disk_usage>
43    pub total_written_bytes: u64,
44    /// Written bytes per second.
45    pub written_bytes: u64,
46    /// Total number of read bytes.
47    pub total_read_bytes: u64,
48    /// Read bytes per second.
49    pub read_bytes: u64,
50}
51
52/// Required information for spawning a new process.
53pub struct JobSpawnInfo {
54    pub job_id: JobId,
55    pub args: Vec<String>,
56    pub restart_info: RestartInfo,
57}
58
59impl ProcInfo {
60    pub fn program(&self) -> &str {
61        self.cmd_args.first().map(|s| s.as_str()).unwrap_or("")
62    }
63}
64
65#[derive(Clone, Serialize, Deserialize, Debug)]
66pub enum ProcStatus {
67    Spawned,
68    Running,
69    ExitOk,
70    ExitErr(i32),
71    Unknown(String),
72}
73
74impl ProcStatus {
75    pub fn exited(&self) -> bool {
76        matches!(self, ProcStatus::ExitOk | ProcStatus::ExitErr(_))
77    }
78}
79
80/// Log line from captured stdout/stderr output
81#[derive(Clone, Serialize, Deserialize, Debug)]
82pub struct LogLine {
83    pub ts: DateTime<Local>,
84    pub job_id: JobId,
85    pub pid: Pid,
86    pub line: String,
87    pub is_stderr: bool,
88}
89
90impl LogLine {
91    pub fn log(&self, formatter: &Formatter) {
92        let dt = self.ts.format("%F %T%.3f");
93        let job_id = self.job_id;
94        let pid = self.pid;
95        let line = &self.line;
96        let color = formatter.log_color_proc(job_id as usize, self.is_stderr);
97        println!("{color}{dt} [{job_id}|{pid}] {line}{color:#}");
98    }
99}
100
101/// Buffer for captured stdout/stderr output
102pub struct OutputBuffer {
103    lines: VecDeque<LogLine>,
104    max_len: Option<usize>,
105}
106
107impl OutputBuffer {
108    pub fn new(max_len: Option<usize>) -> Self {
109        OutputBuffer {
110            max_len,
111            lines: VecDeque::new(),
112        }
113    }
114    pub fn push(&mut self, line: LogLine) {
115        self.lines.push_back(line);
116        if let Some(max_len) = self.max_len {
117            if self.lines.len() > max_len {
118                let _ = self.lines.pop_front();
119            }
120        }
121    }
122    pub fn lines_since(&self, last_seen: &mut DateTime<Local>) -> impl Iterator<Item = &LogLine> {
123        let ts = *last_seen;
124        if let Some(entry) = self.lines.back() {
125            *last_seen = entry.ts;
126        }
127        self.lines.iter().skip_while(move |entry| ts >= entry.ts)
128    }
129}
130
131impl Runner {
132    pub fn spawn(
133        job_id: JobId,
134        args: &[String],
135        restart_info: RestartInfo,
136        channel: mpsc::Sender<Pid>,
137    ) -> Result<Self, DispatcherError> {
138        let cmd_args = args.to_vec();
139        let mut cmd = VecDeque::from(args.to_owned());
140        let Some(exe) = cmd.pop_front() else {
141            return Err(DispatcherError::EmptyProcCommandError);
142        };
143        // info!("Spawning {exe} {cmd:?}");
144
145        let mut child = Command::new(exe)
146            .args(cmd)
147            .stdin(Stdio::piped())
148            .stdout(Stdio::piped())
149            .stderr(Stdio::piped())
150            // spawn process group (https://biriukov.dev/docs/fd-pipe-session-terminal/3-process-groups-jobs-and-sessions/)
151            .group_spawn()
152            .map_err(|err| DispatcherError::ProcSpawnError(cmd_args.join(" "), err))?;
153        let pid = child.id();
154
155        // output listeners
156        let max_len = 200; // TODO: Make configurable
157        let output = Arc::new(Mutex::new(OutputBuffer::new(Some(max_len))));
158
159        let buffer = output.clone();
160        let stdout = child.inner().stdout.take().unwrap();
161        let _stdout_handle = thread::spawn(move || {
162            output_listener(
163                BufReader::new(stdout),
164                job_id,
165                pid,
166                false,
167                buffer,
168                Some(channel),
169            )
170        });
171
172        let buffer = output.clone();
173        let stderr = child.inner().stderr.take().unwrap();
174        let _stderr_handle = thread::spawn(move || {
175            output_listener(BufReader::new(stderr), job_id, pid, true, buffer, None)
176        });
177
178        let info = ProcInfo {
179            job_id,
180            pid,
181            cmd_args,
182            state: ProcStatus::Spawned,
183            start: Local::now(),
184            end: None,
185            cpu: 0.0,
186            memory: 0,
187            virtual_memory: 0,
188            total_written_bytes: 0,
189            written_bytes: 0,
190            total_read_bytes: 0,
191            read_bytes: 0,
192        };
193
194        let child_proc = Runner {
195            proc: child,
196            info,
197            restart_info,
198            user_terminated: false,
199            output,
200        };
201        Ok(child_proc)
202    }
203    pub fn update_proc_state(&mut self) -> &ProcInfo {
204        if self.info.end.is_none() {
205            self.info.state = match self.proc.try_wait() {
206                Ok(Some(status)) if status.success() => ProcStatus::ExitOk,
207                Ok(Some(status)) => ProcStatus::ExitErr(status.code().unwrap_or(0)),
208                Ok(None) => ProcStatus::Running,
209                Err(e) => ProcStatus::Unknown(e.to_string()),
210            };
211        }
212        &self.info
213    }
214    pub fn is_running(&mut self) -> bool {
215        !self.update_proc_state().state.exited()
216    }
217    pub fn terminate(&mut self) -> Result<(), std::io::Error> {
218        info!("Terminating process {}", self.proc.id());
219        self.proc.kill()?;
220        Ok(())
221    }
222    pub fn restart_infos(&mut self) -> Option<JobSpawnInfo> {
223        let respawn = !self.user_terminated
224            && match self.restart_info.policy {
225                RestartPolicy::Always => true,
226                RestartPolicy::OnFailure => {
227                    matches!(self.info.state, ProcStatus::ExitErr(code) if code > 0)
228                }
229                RestartPolicy::Never => false,
230            };
231        if respawn {
232            let last_duration = self.info.end.unwrap_or(Local::now()) - self.info.start;
233            let mut restart_info = self.restart_info.clone();
234            if last_duration > TimeDelta::milliseconds(50) {
235                // Reset wait time after a long run
236                restart_info.wait_time = 50;
237            } else {
238                restart_info.wait_time *= 2;
239            }
240            Some(JobSpawnInfo {
241                job_id: self.info.job_id,
242                args: self.info.cmd_args.clone(),
243                restart_info,
244            })
245        } else {
246            None
247        }
248    }
249}
250
251impl Drop for Runner {
252    fn drop(&mut self) {
253        self.terminate().ok();
254    }
255}
256
257fn output_listener<R: Read>(
258    reader: BufReader<R>,
259    job_id: JobId,
260    pid: Pid,
261    is_stderr: bool,
262    buffer: Arc<Mutex<OutputBuffer>>,
263    channel: Option<mpsc::Sender<Pid>>,
264) {
265    reader.lines().map_while(Result::ok).for_each(|line| {
266        let ts = Local::now();
267        if is_stderr {
268            eprintln!("[{job_id}|{pid}] {line}");
269        } else {
270            println!("[[{job_id}|{pid}] {line}");
271        }
272        if let Ok(mut buffer) = buffer.lock() {
273            let entry = LogLine {
274                ts,
275                job_id,
276                pid,
277                is_stderr,
278                line,
279            };
280            buffer.push(entry);
281        }
282    });
283    if let Some(channel) = channel {
284        let ts = Local::now();
285        if let Ok(mut buffer) = buffer.lock() {
286            let entry = LogLine {
287                ts,
288                job_id,
289                pid,
290                is_stderr,
291                line: "<process terminated>".to_string(),
292            };
293            buffer.push(entry);
294        }
295        // Notify watcher
296        channel.send(pid).unwrap();
297    }
298}
299
300/// Current user
301pub fn get_user_name() -> Option<String> {
302    let system = System::new_with_specifics(
303        RefreshKind::nothing()
304            .with_processes(ProcessRefreshKind::nothing().with_user(UpdateKind::OnlyIfNotSet)),
305    );
306    let users = Users::new_with_refreshed_list();
307    let pid = process::id();
308    system
309        .process(sysinfo::Pid::from_u32(pid))
310        .and_then(|proc| {
311            proc.effective_user_id()
312                .or(proc.user_id())
313                .and_then(|uid| users.get_user_by_id(uid))
314                .map(|user| user.name().to_string())
315        })
316}