shell_compose/
runner.rs

1use crate::{DispatcherError, Formatter, JobId, Pid, RestartInfo};
2use chrono::{DateTime, Local};
3use command_group::{CommandGroup, GroupChild};
4use log::info;
5use serde::{Deserialize, Serialize};
6use std::collections::VecDeque;
7use std::io::{BufRead, BufReader, Read};
8use std::process::{self, Command, Stdio};
9use std::sync::{mpsc, Arc, Mutex};
10use std::thread;
11use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind, Users};
12
13/// Child process controller
14pub struct Runner {
15    pub proc: GroupChild,
16    pub info: ProcInfo,
17    pub restart_info: RestartInfo,
18    /// Flag set in stop/down command to prevent restart
19    pub user_terminated: bool,
20    pub output: Arc<Mutex<OutputBuffer>>,
21}
22
23/// Process information
24#[derive(Clone, Serialize, Deserialize, Debug)]
25pub struct ProcInfo {
26    pub job_id: JobId,
27    pub pid: Pid,
28    pub cmd_args: Vec<String>,
29    pub state: ProcStatus,
30    pub start: DateTime<Local>,
31    pub end: Option<DateTime<Local>>,
32    /// Total CPU usage (in %)
33    /// See <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.cpu_usage>
34    pub cpu: f32,
35    /// Memory usage (in bytes).
36    /// See <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.memory>
37    pub memory: u64,
38    /// Virtual memory usage (in bytes).
39    /// <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.virtual_memory>
40    pub virtual_memory: u64,
41    /// Total number of written bytes.
42    /// <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.disk_usage>
43    pub total_written_bytes: u64,
44    /// Written bytes per second.
45    pub written_bytes: u64,
46    /// Total number of read bytes.
47    pub total_read_bytes: u64,
48    /// Read bytes per second.
49    pub read_bytes: u64,
50}
51
52impl ProcInfo {
53    pub fn program(&self) -> &str {
54        self.cmd_args.first().map(|s| s.as_str()).unwrap_or("")
55    }
56}
57
58#[derive(Clone, Serialize, Deserialize, Debug)]
59pub enum ProcStatus {
60    Spawned,
61    Running,
62    ExitOk,
63    ExitErr(i32),
64    Unknown(String),
65}
66
67impl ProcStatus {
68    pub fn exited(&self) -> bool {
69        matches!(self, ProcStatus::ExitOk | ProcStatus::ExitErr(_))
70    }
71}
72
73/// Log line from captured stdout/stderr output
74#[derive(Clone, Serialize, Deserialize, Debug)]
75pub struct LogLine {
76    pub ts: DateTime<Local>,
77    pub job_id: JobId,
78    pub pid: Pid,
79    pub line: String,
80    pub is_stderr: bool,
81}
82
83impl LogLine {
84    pub fn log(&self, formatter: &Formatter) {
85        let dt = self.ts.format("%F %T%.3f");
86        let job_id = self.job_id;
87        let pid = self.pid;
88        let line = &self.line;
89        let color = formatter.log_color_proc(job_id as usize, self.is_stderr);
90        println!("{color}{dt} [{job_id}|{pid}] {line}{color:#}");
91    }
92}
93
94/// Buffer for captured stdout/stderr output
95pub struct OutputBuffer {
96    lines: VecDeque<LogLine>,
97    max_len: Option<usize>,
98}
99
100impl OutputBuffer {
101    pub fn new(max_len: Option<usize>) -> Self {
102        OutputBuffer {
103            max_len,
104            lines: VecDeque::new(),
105        }
106    }
107    pub fn push(&mut self, line: LogLine) {
108        self.lines.push_back(line);
109        if let Some(max_len) = self.max_len {
110            if self.lines.len() > max_len {
111                let _ = self.lines.pop_front();
112            }
113        }
114    }
115    pub fn lines_since(&self, last_seen: &mut DateTime<Local>) -> impl Iterator<Item = &LogLine> {
116        let ts = *last_seen;
117        if let Some(entry) = self.lines.back() {
118            *last_seen = entry.ts;
119        }
120        self.lines.iter().skip_while(move |entry| ts >= entry.ts)
121    }
122}
123
124impl Runner {
125    pub fn spawn(
126        job_id: JobId,
127        args: &[String],
128        restart_info: RestartInfo,
129        channel: mpsc::Sender<Pid>,
130    ) -> Result<Self, DispatcherError> {
131        let cmd_args = args.to_vec();
132        let mut cmd = VecDeque::from(args.to_owned());
133        let Some(exe) = cmd.pop_front() else {
134            return Err(DispatcherError::EmptyProcCommandError);
135        };
136        // info!("Spawning {exe} {cmd:?}");
137
138        let mut child = Command::new(exe)
139            .args(cmd)
140            .stdin(Stdio::piped())
141            .stdout(Stdio::piped())
142            .stderr(Stdio::piped())
143            // spawn process group (https://biriukov.dev/docs/fd-pipe-session-terminal/3-process-groups-jobs-and-sessions/)
144            .group_spawn()
145            .map_err(DispatcherError::ProcSpawnError)?;
146        let pid = child.id();
147
148        // output listeners
149        let max_len = 200; // TODO: Make configurable
150        let output = Arc::new(Mutex::new(OutputBuffer::new(Some(max_len))));
151
152        let buffer = output.clone();
153        let stdout = child.inner().stdout.take().unwrap();
154        let _stdout_handle = thread::spawn(move || {
155            output_listener(
156                BufReader::new(stdout),
157                job_id,
158                pid,
159                false,
160                buffer,
161                Some(channel),
162            )
163        });
164
165        let buffer = output.clone();
166        let stderr = child.inner().stderr.take().unwrap();
167        let _stderr_handle = thread::spawn(move || {
168            output_listener(BufReader::new(stderr), job_id, pid, true, buffer, None)
169        });
170
171        let info = ProcInfo {
172            job_id,
173            pid,
174            cmd_args,
175            state: ProcStatus::Spawned,
176            start: Local::now(),
177            end: None,
178            cpu: 0.0,
179            memory: 0,
180            virtual_memory: 0,
181            total_written_bytes: 0,
182            written_bytes: 0,
183            total_read_bytes: 0,
184            read_bytes: 0,
185        };
186
187        let child_proc = Runner {
188            proc: child,
189            info,
190            restart_info,
191            user_terminated: false,
192            output,
193        };
194        Ok(child_proc)
195    }
196    pub fn update_proc_state(&mut self) -> &ProcInfo {
197        if self.info.end.is_none() {
198            self.info.state = match self.proc.try_wait() {
199                Ok(Some(status)) if status.success() => ProcStatus::ExitOk,
200                Ok(Some(status)) => ProcStatus::ExitErr(status.code().unwrap_or(0)),
201                Ok(None) => ProcStatus::Running,
202                Err(e) => ProcStatus::Unknown(e.to_string()),
203            };
204        }
205        &self.info
206    }
207    pub fn is_running(&mut self) -> bool {
208        !self.update_proc_state().state.exited()
209    }
210    pub fn terminate(&mut self) -> Result<(), std::io::Error> {
211        info!("Terminating process {}", self.proc.id());
212        self.proc.kill()?;
213        Ok(())
214    }
215}
216
217impl Drop for Runner {
218    fn drop(&mut self) {
219        self.terminate().ok();
220    }
221}
222
223fn output_listener<R: Read>(
224    reader: BufReader<R>,
225    job_id: JobId,
226    pid: Pid,
227    is_stderr: bool,
228    buffer: Arc<Mutex<OutputBuffer>>,
229    channel: Option<mpsc::Sender<Pid>>,
230) {
231    reader.lines().map_while(Result::ok).for_each(|line| {
232        let ts = Local::now();
233        if is_stderr {
234            eprintln!("[{job_id}|{pid}] {line}");
235        } else {
236            println!("[[{job_id}|{pid}] {line}");
237        }
238        if let Ok(mut buffer) = buffer.lock() {
239            let entry = LogLine {
240                ts,
241                job_id,
242                pid,
243                is_stderr,
244                line,
245            };
246            buffer.push(entry);
247        }
248    });
249    if let Some(channel) = channel {
250        let ts = Local::now();
251        if let Ok(mut buffer) = buffer.lock() {
252            let entry = LogLine {
253                ts,
254                job_id,
255                pid,
256                is_stderr,
257                line: "<process terminated>".to_string(),
258            };
259            buffer.push(entry);
260        }
261        // Notify watcher
262        channel.send(pid).unwrap();
263    }
264}
265
266/// Current user
267pub fn get_user_name() -> Option<String> {
268    let system = System::new_with_specifics(
269        RefreshKind::new()
270            .with_processes(ProcessRefreshKind::new().with_user(UpdateKind::OnlyIfNotSet)),
271    );
272    let users = Users::new_with_refreshed_list();
273    let pid = process::id();
274    system
275        .process(sysinfo::Pid::from_u32(pid))
276        .and_then(|proc| {
277            proc.effective_user_id()
278                .or(proc.user_id())
279                .and_then(|uid| users.get_user_by_id(uid))
280                .map(|user| user.name().to_string())
281        })
282}