shell_compose/
runner.rs

1use crate::{DispatcherError, Formatter, JobId, Pid, RestartInfo};
2use chrono::{DateTime, Local};
3use log::info;
4use serde::{Deserialize, Serialize};
5use std::collections::VecDeque;
6use std::io::{BufRead, BufReader, Read};
7use std::process::{self, Child, Command, Stdio};
8use std::sync::{mpsc, Arc, Mutex};
9use std::thread;
10use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind, Users};
11
12/// Child process controller
13pub struct Runner {
14    pub proc: Child,
15    pub info: ProcInfo,
16    pub restart_info: RestartInfo,
17    /// Flag set in stop/down command to prevent restart
18    pub user_terminated: bool,
19    pub output: Arc<Mutex<OutputBuffer>>,
20}
21
22/// Process information
23#[derive(Clone, Serialize, Deserialize, Debug)]
24pub struct ProcInfo {
25    pub job_id: JobId,
26    pub pid: Pid,
27    pub cmd_args: Vec<String>,
28    pub state: ProcStatus,
29    pub start: DateTime<Local>,
30    pub end: Option<DateTime<Local>>,
31    /// Total CPU usage (in %)
32    /// See <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.cpu_usage>
33    pub cpu: f32,
34    /// Memory usage (in bytes).
35    /// See <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.memory>
36    pub memory: u64,
37    /// Virtual memory usage (in bytes).
38    /// <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.virtual_memory>
39    pub virtual_memory: u64,
40    /// Total number of written bytes.
41    /// <https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.disk_usage>
42    pub total_written_bytes: u64,
43    /// Written bytes per second.
44    pub written_bytes: u64,
45    /// Total number of read bytes.
46    pub total_read_bytes: u64,
47    /// Read bytes per second.
48    pub read_bytes: u64,
49}
50
51impl ProcInfo {
52    pub fn program(&self) -> &str {
53        self.cmd_args.first().map(|s| s.as_str()).unwrap_or("")
54    }
55}
56
57#[derive(Clone, Serialize, Deserialize, Debug)]
58pub enum ProcStatus {
59    Spawned,
60    Running,
61    ExitOk,
62    ExitErr(i32),
63    Unknown(String),
64}
65
66impl ProcStatus {
67    pub fn exited(&self) -> bool {
68        matches!(self, ProcStatus::ExitOk | ProcStatus::ExitErr(_))
69    }
70}
71
72/// Log line from captured stdout/stderr output
73#[derive(Clone, Serialize, Deserialize, Debug)]
74pub struct LogLine {
75    pub ts: DateTime<Local>,
76    pub job_id: JobId,
77    pub pid: Pid,
78    pub line: String,
79    pub is_stderr: bool,
80}
81
82impl LogLine {
83    pub fn log(&self, formatter: &Formatter) {
84        let dt = self.ts.format("%F %T%.3f");
85        let job_id = self.job_id;
86        let pid = self.pid;
87        let line = &self.line;
88        let color = formatter.log_color_proc(job_id as usize, self.is_stderr);
89        println!("{color}{dt} [{job_id}|{pid}] {line}{color:#}");
90    }
91}
92
93/// Buffer for captured stdout/stderr output
94pub struct OutputBuffer {
95    lines: VecDeque<LogLine>,
96    max_len: Option<usize>,
97}
98
99impl OutputBuffer {
100    pub fn new(max_len: Option<usize>) -> Self {
101        OutputBuffer {
102            max_len,
103            lines: VecDeque::new(),
104        }
105    }
106    pub fn push(&mut self, line: LogLine) {
107        self.lines.push_back(line);
108        if let Some(max_len) = self.max_len {
109            if self.lines.len() > max_len {
110                let _ = self.lines.pop_front();
111            }
112        }
113    }
114    pub fn lines_since(&self, last_seen: &mut DateTime<Local>) -> impl Iterator<Item = &LogLine> {
115        let ts = *last_seen;
116        if let Some(entry) = self.lines.back() {
117            *last_seen = entry.ts;
118        }
119        self.lines.iter().skip_while(move |entry| ts >= entry.ts)
120    }
121}
122
123impl Runner {
124    pub fn spawn(
125        job_id: JobId,
126        args: &[String],
127        restart_info: RestartInfo,
128        channel: mpsc::Sender<Pid>,
129    ) -> Result<Self, DispatcherError> {
130        let cmd_args = args.to_vec();
131        let mut cmd = VecDeque::from(args.to_owned());
132        let Some(exe) = cmd.pop_front() else {
133            return Err(DispatcherError::EmptyProcCommandError);
134        };
135        // info!("Spawning {exe} {cmd:?}");
136
137        let mut child = Command::new(exe)
138            .args(cmd)
139            .stdin(Stdio::piped())
140            .stdout(Stdio::piped())
141            .stderr(Stdio::piped())
142            .spawn()
143            .map_err(DispatcherError::ProcSpawnError)?;
144        let pid = child.id();
145
146        // output listeners
147        let max_len = 200; // TODO: Make configurable
148        let output = Arc::new(Mutex::new(OutputBuffer::new(Some(max_len))));
149
150        let buffer = output.clone();
151        let stdout = child.stdout.take().unwrap();
152        let _stdout_handle = thread::spawn(move || {
153            output_listener(
154                BufReader::new(stdout),
155                job_id,
156                pid,
157                false,
158                buffer,
159                Some(channel),
160            )
161        });
162
163        let buffer = output.clone();
164        let stderr = child.stderr.take().unwrap();
165        let _stderr_handle = thread::spawn(move || {
166            output_listener(BufReader::new(stderr), job_id, pid, true, buffer, None)
167        });
168
169        let info = ProcInfo {
170            job_id,
171            pid,
172            cmd_args,
173            state: ProcStatus::Spawned,
174            start: Local::now(),
175            end: None,
176            cpu: 0.0,
177            memory: 0,
178            virtual_memory: 0,
179            total_written_bytes: 0,
180            written_bytes: 0,
181            total_read_bytes: 0,
182            read_bytes: 0,
183        };
184
185        let child_proc = Runner {
186            proc: child,
187            info,
188            restart_info,
189            user_terminated: false,
190            output,
191        };
192        Ok(child_proc)
193    }
194    pub fn update_proc_state(&mut self) -> &ProcInfo {
195        if self.info.end.is_none() {
196            self.info.state = match self.proc.try_wait() {
197                Ok(Some(status)) if status.success() => ProcStatus::ExitOk,
198                Ok(Some(status)) => ProcStatus::ExitErr(status.code().unwrap_or(0)),
199                Ok(None) => ProcStatus::Running,
200                Err(e) => ProcStatus::Unknown(e.to_string()),
201            };
202        }
203        &self.info
204    }
205    pub fn is_running(&mut self) -> bool {
206        !self.update_proc_state().state.exited()
207    }
208    pub fn terminate(&mut self) -> Result<(), std::io::Error> {
209        if self.info.program() == "just" {
210            // just does not propagate signals, so we have to kill its child process
211            let just_pid = self.proc.id() as usize;
212            let system = System::new_with_specifics(
213                RefreshKind::new().with_processes(ProcessRefreshKind::new()),
214            );
215            if let Some((pid, process)) = system.processes().iter().find(|(_pid, process)| {
216                process.parent().unwrap_or(0.into()) == just_pid.into()
217                    && process.name() != "ctrl-c"
218            }) {
219                info!("Terminating process {pid} (parent process {just_pid})");
220                process.kill(); // process.kill_with(Signal::Term)
221            }
222            // In an interactive terminal session, sending Ctrl-C terminates the running process.
223            // let mut stdin = self.proc.stdin.take().unwrap();
224            // stdin.write_all(&[3])?;
225        } else {
226            info!("Terminating process {}", self.proc.id());
227            self.proc.kill()?;
228        }
229        Ok(())
230    }
231}
232
233impl Drop for Runner {
234    fn drop(&mut self) {
235        self.terminate().ok();
236    }
237}
238
239fn output_listener<R: Read>(
240    reader: BufReader<R>,
241    job_id: JobId,
242    pid: Pid,
243    is_stderr: bool,
244    buffer: Arc<Mutex<OutputBuffer>>,
245    channel: Option<mpsc::Sender<Pid>>,
246) {
247    reader.lines().map_while(Result::ok).for_each(|line| {
248        let ts = Local::now();
249        if is_stderr {
250            eprintln!("[{job_id}|{pid}] {line}");
251        } else {
252            println!("[[{job_id}|{pid}] {line}");
253        }
254        if let Ok(mut buffer) = buffer.lock() {
255            let entry = LogLine {
256                ts,
257                job_id,
258                pid,
259                is_stderr,
260                line,
261            };
262            buffer.push(entry);
263        }
264    });
265    if let Some(channel) = channel {
266        let ts = Local::now();
267        if let Ok(mut buffer) = buffer.lock() {
268            let entry = LogLine {
269                ts,
270                job_id,
271                pid,
272                is_stderr,
273                line: "<process terminated>".to_string(),
274            };
275            buffer.push(entry);
276        }
277        // Notify watcher
278        channel.send(pid).unwrap();
279    }
280}
281
282/// Current user
283pub fn get_user_name() -> Option<String> {
284    let system = System::new_with_specifics(
285        RefreshKind::new()
286            .with_processes(ProcessRefreshKind::new().with_user(UpdateKind::OnlyIfNotSet)),
287    );
288    let users = Users::new_with_refreshed_list();
289    let pid = process::id();
290    system
291        .process(sysinfo::Pid::from_u32(pid))
292        .and_then(|proc| {
293            proc.effective_user_id()
294                .or(proc.user_id())
295                .and_then(|uid| users.get_user_by_id(uid))
296                .map(|user| user.name().to_string())
297        })
298}