swarm_commander/
runner.rs

1use crate::event::{RunnerEvent, RunnerStopEvent, RunnerStartEvent, StatusEvent, RunnerLogEvent, StdType};
2use std::collections::HashMap;
3use tokio::process::{Command, Child};
4use anyhow::Result;
5use tokio::io::{BufReader, AsyncBufReadExt};
6use std::process::Stdio;
7use log::{info, warn, debug, error};
8use std::sync::{Arc};
9use futures::lock::Mutex;
10use chrono::prelude::{Utc, DateTime};
11
12
13/// This function controls when the program exited (friendly or crashed) and is able to kill the command when needed. It is like an async wrapper on top of a command.
14pub async fn monitor_process<T>(
15    killer: flume::Receiver<bool>, 
16    event_sender: flume::Sender<RunnerEvent<T>>, 
17    process_clean: flume::Sender<String>, 
18    process_id: String, 
19    mut child: Child, 
20    processes_info: Arc<futures::lock::Mutex<HashMap<String, ProcInfo>>>
21) {
22    info!("Monitor for {} started", process_id);
23    let mut keep_running = true;
24    while keep_running {
25        tokio::select! {
26            res = child.wait() => {
27                info!("Process {} exited", process_id);
28                let status_code = match res {
29                    Ok(res) => {
30                        if let Some(code) = res.code() {
31                            if code != 0 {
32                                warn!("The process {} exited with error code {}, please check the logs", process_id, code);
33                            }
34                            Some(code)
35                        } else {
36                            None
37                        }
38                    }, 
39                    Err(error) => {
40                        error!("Error when exiting {}: {:?}", process_id, error);
41                        None
42                    }
43                };
44                keep_running = false;
45                let processes = processes_info.lock().await;
46                if let Some(proc_info) =processes.get(&process_id) {
47                    event_sender.send_async(RunnerEvent::RunnerStopEvent(RunnerStopEvent{
48                        info: ProcInfo { 
49                            command: proc_info.command.to_owned(), 
50                            args: proc_info.args.to_owned(), 
51                            pid: proc_info.pid, 
52                            start_time: proc_info.start_time.to_owned()
53                        },
54                        code: status_code,
55                        success: status_code.is_some(),
56                        id: process_id.to_owned(),
57                    })).await.unwrap()
58                } else {
59                    error!("No process information to send");
60                }
61                process_clean.send_async(process_id.to_owned()).await.unwrap();
62
63            },
64            _ = killer.recv_async() => {
65                info!("Killing {:?}", process_id);
66                if child.kill().await.is_err() {
67                    let processes = processes_info.lock().await;
68                    if let Some(proc_info) =processes.get(&process_id) {
69                        event_sender.send_async(RunnerEvent::RunnerStopEvent(RunnerStopEvent{
70                            info: ProcInfo { 
71                                command: proc_info.command.to_owned(), 
72                                args: proc_info.args.to_owned(), 
73                                pid: proc_info.pid, 
74                                start_time: proc_info.start_time.to_owned()
75                            },
76                            code: None,
77                            success: false,
78                            id: process_id.to_owned(),
79                        })).await.unwrap()
80                    } else {
81                        error!("No process information to send");
82                    }
83                }
84            }
85        };
86    }
87    info!("Monitor for {} end", process_id);
88}
89
90/// Public information about a running process
91#[derive(Clone, Debug)]
92#[allow(dead_code)]
93pub struct ProcInfo {
94    /// The command executed
95    pub command: String,
96    /// The command arguments
97    pub args: Vec<String>,
98    /// The process pid
99    pub pid: u32,
100    /// When the process started
101    pub start_time: DateTime<Utc>
102}
103
104
105/// The most important part and the thing that you have to use. It creates an enviroment where your command can live, enjoy and die. The lifecycle. You are the queen of this hive so you decide what commands are
106/// created, how many and when they will die. All data that comes from the commands will be sent to the `client_event_notifier` provided.
107///
108///```
109/// use anyhow::Result;
110/// use tokio::process::Command;
111/// use swarm_commander::{run_hive, StdType, RunnerEvent::{RunnerStartEvent, RunnerStopEvent, RunnerStatusEvent, RunnerLogEvent}};
112/// 
113/// // This is what you parse will build from a command output line
114/// #[derive(Debug)]
115/// struct Request {
116///     method: String,
117///     status: String,
118///     user_agent: String,
119///     url: String
120///  }
121/// 
122/// #[tokio::main]
123/// async fn main() -> Result<()> {
124///     // Create the communication channel
125///     let (tx, rx) = flume::unbounded();
126/// 
127///     // A command you want to run.
128///     let mut cmd = Command::new("/usr/bin/nginx");
129///     cmd.arg("-c").arg("/opt/nginx/nginx.conf");
130///         
131///     // Your parser which will receive all the outputs and parse them. Return None if you just want to skip the line
132///     let parser = move |line: &str, pid: u32, std_type: &StdType| -> Option<Request> {
133///         // This nginx output is like "GET /index.html 200 Mozilla/5.0"
134///         if line.starts_with("GET") || line.starts_with("POST") {
135///             // I'm interested only on GET and POST requests
136///             let parts = line.split(" ").collect::<Vec<&str>>();
137///             Some(Request {
138///             method: parts[0].to_owned(),
139///             status: parts[2].to_owned(),
140///             user_agent: parts[3].to_owned(),
141///             url: parts[1].to_owned(),
142///             })
143///         } else {
144///             // Other kind of request or any other output that I'm ignoring
145///             None
146///         }
147///     };
148///   
149///     // Establish a hive
150///     let (_, mut hive) = run_hive(tx.clone(), parser).await;
151///     // Spawn the nginx command
152///     hive.spawn("my-nginx", cmd).await?;
153///   
154///     // I will use this interval to kill the nginx in 15 seconds
155///     let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000));
156///     interval.tick().await;
157///     
158///     // Wait for the updates
159///     let mut keep_running = true;
160///     while keep_running {
161///         tokio::select! {
162///             message = rx.recv_async() => {
163///                 match message {
164///                     Ok(message) => {
165///                         // message is any kind of `RunnerEvent`
166///                         match message {
167///                             RunnerStartEvent(event) => {
168///                                 println!("Process with id {} started", event.id)
169///                             }, 
170///                             RunnerStopEvent(event) => {
171///                                 println!("Process with pid {} died", event.pid)
172///                             },
173///                             RunnerStatusEvent(event) => {
174///                                 println!("New message from {}: {:?}", event.id, event.data)
175///                             },
176///                             RunnerLogEvent(event) => {
177///                                 println!("Log of type {:?} from {}: {:?}", event.std, event.id, event.log)
178///                             }
179///                         }
180///                     },
181///                     Err(err) => {
182///                         println!("ERROR {:?}", err);
183///                         keep_running = false;
184///                     }
185///                 }
186///                 
187///             },
188///             _ = interval.tick() => {
189///                 // List all running processes
190///                 let proc_list = hive.processes_info().await;
191///                 println("Before die: {:?}", proc_list);
192///                 println!("DIE NGINX DIE HAHAHAAH");
193///                 hive.halt("my-nginx").await?;
194///                 let proc_list = hive.processes_info().await;
195///                 println("After die: {:?}", proc_list);
196///             }
197///         }
198///     }
199///     
200///     // Kill all running processes before exit
201///     hive.disband().await?;
202///     Ok(())
203/// }
204///```
205pub async fn run_hive<F, T: 'static + std::marker::Send>(client_event_notifier: flume::Sender<RunnerEvent<T>>, f: F) -> (tokio::task::JoinHandle<()>, Hive) 
206where F: FnMut(&str, u32, &StdType) -> Option<T> + std::marker::Send + Copy + 'static
207{
208    let mut processes: HashMap<String, flume::Sender<bool>> = HashMap::new();
209    let (termination_notifier, termination_receiver) = flume::unbounded::<String>();
210    let (kill_request_sender, kill_request_receiver) = flume::unbounded::<String>();
211    let (run_request_sender, run_request_receiver) = flume::unbounded::<(String, Command)>();
212    let (disband_sender, disband_receiver) = flume::unbounded();
213
214    let info_proc = Arc::new(Mutex::new(HashMap::new()));
215    let hive_info_proc = info_proc.clone();
216
217    let join_handle = tokio::spawn(async move {
218        let mut run = true;
219        let mut shutting_down = false;
220        while run {
221            tokio::select!(
222                id = termination_receiver.recv_async() => {
223                    if let Ok(id) = id {
224                        info!("Cleaning process {}", id);
225                        processes.remove(&id);
226                        info_proc.lock().await.remove(&id);
227
228                        if shutting_down {
229                            let remaining = processes.len();
230                            if remaining == 0 {
231                                info!("No processes running, hive disbanded");
232                                run = false;
233                            } else {
234                                info!("{remaining} remaining processes until disband");
235                            }
236                        }
237                    }
238                },
239                id = kill_request_receiver.recv_async() => {
240                    if let Ok(id) = id {
241                        if let Some(process_killer) = processes.get(&id) {
242                            info!("Killing {}", id);
243                            if let Err(error) = process_killer.send_async(true).await {
244                                error!("Error when killing {:?}", error);
245                            }
246                        } else {
247                            warn!("Trying to kill a missing process {}", id);
248                        }
249                    }
250                    
251                },
252                process_data = run_request_receiver.recv_async() => {
253                    if let Ok((id, mut cmd)) = process_data {
254                        info!("Starting {:?}", cmd);
255                        cmd.stderr(Stdio::piped());
256                        cmd.stdout(Stdio::piped());
257    
258                        match cmd.spawn() {
259                            Ok(mut child) => {
260                                let pid = child.id().unwrap();
261                                client_event_notifier.send_async(RunnerEvent::RunnerStartEvent(RunnerStartEvent{
262                                    success: true,
263                                    pid,
264                                    id: id.to_owned()
265                                })).await.unwrap();
266                                let stderr = child.stderr.take().unwrap();
267                                let reader_err = BufReader::new(stderr);
268                                let stdout = child.stdout.take().unwrap();
269                                let reader_out = BufReader::new(stdout);
270                                
271                                let (stop_sender, stop_receiver) = flume::bounded(1);
272                                processes.insert(id.to_owned(), stop_sender);
273                                let std_command = cmd.as_std();
274                                let args = std_command.get_args().into_iter().map(|arg| arg.to_str().unwrap().to_owned()).collect::<Vec<String>>();
275                                let proc_info = ProcInfo {
276                                    command: std_command.get_program().to_str().unwrap().to_owned(),
277                                    args,
278                                    start_time: Utc::now(),
279                                    pid,
280                                };
281                                // Create a copy just to make the things easier
282                                /*let monitor_proc_info = ProcInfo { 
283                                    command: proc_info.command.to_owned(), 
284                                    args: proc_info.args.to_owned(), 
285                                    pid: proc_info.pid, 
286                                    start_time: proc_info.start_time.to_owned()
287                                };*/
288                                info_proc.lock().await.insert(id.to_owned(), proc_info);
289                                tokio::spawn(monitor_process(stop_receiver, client_event_notifier.clone(), termination_notifier.clone(), id.to_owned(), child, info_proc.clone()));
290                                tokio::spawn(std_reader(reader_out, client_event_notifier.clone(),id.to_owned(), pid, StdType::Out, f));
291                                tokio::spawn(std_reader(reader_err, client_event_notifier.clone(),id.to_owned(), pid, StdType::Err, f));
292                            },
293                            Err(err) => error!("{:?}", err)
294                        };
295                    } 
296                },
297                _ = disband_receiver.recv_async() => {
298                    info!("Starting hive disband");
299                    shutting_down = true;
300                    if !processes.is_empty() {
301                        for process_killer in processes.values_mut() {
302                            if let Err(error) = process_killer.send_async(true).await {
303                                error!("Error when killing {:?}", error);
304                            }
305                        }
306                    } else {
307                        // No processes pending to kill, we can exit
308                        run = false;
309                    }
310                    
311                }
312            );
313        }
314        info!("Hive disbanded");
315    });
316
317    (join_handle, Hive{
318        kill_request_sender,
319        run_request_sender,
320        disband_sender,
321        processes_info: hive_info_proc
322    })
323
324}
325
326/// The place where all of your commands are living
327pub struct Hive {
328    kill_request_sender: flume::Sender<String>,
329    run_request_sender: flume::Sender<(String, Command)>,
330    disband_sender: flume::Sender<bool>,
331    processes_info: Arc<Mutex<HashMap<String, ProcInfo>>>
332}
333
334
335impl Hive {
336    /// Stop, kill, murder... Just when you want to stop a command
337    pub async fn halt(&mut self, id: &str) -> Result<()> {
338        Ok(self.kill_request_sender.send_async(id.to_owned()).await?)
339    }
340    /// Create a new command that will live in the hive and work for you until his death
341    pub async fn spawn(&mut self, id: &str, cmd: Command) -> Result<()> {
342        debug!("Spawn {}", id);
343        Ok(self.run_request_sender.send_async((id.to_string(), cmd)).await?)
344    }
345    /// Disband the swarm. Stop all processes and tasks
346    pub async fn disband(&mut self) -> Result<()> {
347        Ok(self.disband_sender.send_async(true).await?)
348    }
349
350    /// Information of all processes. The returned data is cloned from the
351    /// internal registry so don't call it so often if your process list is huge
352    pub async fn processes_info(&self) -> HashMap<String, ProcInfo> {
353        self.processes_info.lock().await.clone()
354    }
355
356}
357
358/// The stdout and stderr reader. It reads asynchronously line by line and provides to your parser each one.
359pub async fn std_reader<F, T>(mut reader: BufReader<impl tokio::io::AsyncRead + Unpin>, task_sender: flume::Sender<RunnerEvent<T>>, id: String, pid: u32, std_type: StdType, mut f: F) 
360where F: FnMut(&str, u32, &StdType) -> Option<T> + std::marker::Send + Copy + 'static
361{
362    debug!("Std reader started");
363    let mut buf = Vec::<u8>::new();
364    let mut log = std::collections::VecDeque::<String>::with_capacity(10);
365    let mut keep_runnig = true;
366    while reader.read_until(b'\n', &mut buf).await.unwrap() != 0 && keep_runnig {
367        let line = String::from_utf8(buf.to_owned()).unwrap();
368        log.push_front(line.to_owned());
369        log.truncate(10);
370        if let Some(m) = f(&line, pid, &std_type) {
371            let event = StatusEvent{
372                id: id.to_owned(),
373                data: m
374            };
375            if let Err(error) = task_sender.send_async(RunnerEvent::RunnerStatusEvent(event)).await {
376                if task_sender.is_disconnected() {
377                    error!("Event sender for {} disconnected, closing reader", id);
378                    keep_runnig = false;
379                } else {
380                    error!("Error when sending event: {:?}", error);
381                }
382            }
383        }
384        buf.clear();
385    }
386    debug!("Std reader closed");
387    debug!("Last lines {:?}", log);
388    if !keep_runnig {
389        warn!("Reader exited because of an error, please check the logs");
390    }
391
392    if let Err(error) = task_sender.send_async(RunnerEvent::RunnerLogEvent(RunnerLogEvent{id: id.to_owned(), log, std: std_type})).await {
393        error!("Cannot send log of {}: {:?}", id, error);
394    }
395}