swarm_commander/runner.rs
1use crate::event::{RunnerEvent, RunnerStopEvent, RunnerStartEvent, StatusEvent, RunnerLogEvent, StdType};
2use std::collections::HashMap;
3use tokio::process::{Command, Child};
4use anyhow::Result;
5use tokio::io::{BufReader, AsyncBufReadExt};
6use std::process::Stdio;
7use log::{info, warn, debug, error};
8use std::sync::{Arc};
9use futures::lock::Mutex;
10use chrono::prelude::{Utc, DateTime};
11
12
13/// This function controls when the program exited (friendly or crashed) and is able to kill the command when needed. It is like an async wrapper on top of a command.
14pub async fn monitor_process<T>(
15 killer: flume::Receiver<bool>,
16 event_sender: flume::Sender<RunnerEvent<T>>,
17 process_clean: flume::Sender<String>,
18 process_id: String,
19 mut child: Child,
20 processes_info: Arc<futures::lock::Mutex<HashMap<String, ProcInfo>>>
21) {
22 info!("Monitor for {} started", process_id);
23 let mut keep_running = true;
24 while keep_running {
25 tokio::select! {
26 res = child.wait() => {
27 info!("Process {} exited", process_id);
28 let status_code = match res {
29 Ok(res) => {
30 if let Some(code) = res.code() {
31 if code != 0 {
32 warn!("The process {} exited with error code {}, please check the logs", process_id, code);
33 }
34 Some(code)
35 } else {
36 None
37 }
38 },
39 Err(error) => {
40 error!("Error when exiting {}: {:?}", process_id, error);
41 None
42 }
43 };
44 keep_running = false;
45 let processes = processes_info.lock().await;
46 if let Some(proc_info) =processes.get(&process_id) {
47 event_sender.send_async(RunnerEvent::RunnerStopEvent(RunnerStopEvent{
48 info: ProcInfo {
49 command: proc_info.command.to_owned(),
50 args: proc_info.args.to_owned(),
51 pid: proc_info.pid,
52 start_time: proc_info.start_time.to_owned()
53 },
54 code: status_code,
55 success: status_code.is_some(),
56 id: process_id.to_owned(),
57 })).await.unwrap()
58 } else {
59 error!("No process information to send");
60 }
61 process_clean.send_async(process_id.to_owned()).await.unwrap();
62
63 },
64 _ = killer.recv_async() => {
65 info!("Killing {:?}", process_id);
66 if child.kill().await.is_err() {
67 let processes = processes_info.lock().await;
68 if let Some(proc_info) =processes.get(&process_id) {
69 event_sender.send_async(RunnerEvent::RunnerStopEvent(RunnerStopEvent{
70 info: ProcInfo {
71 command: proc_info.command.to_owned(),
72 args: proc_info.args.to_owned(),
73 pid: proc_info.pid,
74 start_time: proc_info.start_time.to_owned()
75 },
76 code: None,
77 success: false,
78 id: process_id.to_owned(),
79 })).await.unwrap()
80 } else {
81 error!("No process information to send");
82 }
83 }
84 }
85 };
86 }
87 info!("Monitor for {} end", process_id);
88}
89
90/// Public information about a running process
91#[derive(Clone, Debug)]
92#[allow(dead_code)]
93pub struct ProcInfo {
94 /// The command executed
95 pub command: String,
96 /// The command arguments
97 pub args: Vec<String>,
98 /// The process pid
99 pub pid: u32,
100 /// When the process started
101 pub start_time: DateTime<Utc>
102}
103
104
105/// The most important part and the thing that you have to use. It creates an enviroment where your command can live, enjoy and die. The lifecycle. You are the queen of this hive so you decide what commands are
106/// created, how many and when they will die. All data that comes from the commands will be sent to the `client_event_notifier` provided.
107///
108///```
109/// use anyhow::Result;
110/// use tokio::process::Command;
111/// use swarm_commander::{run_hive, StdType, RunnerEvent::{RunnerStartEvent, RunnerStopEvent, RunnerStatusEvent, RunnerLogEvent}};
112///
113/// // This is what you parse will build from a command output line
114/// #[derive(Debug)]
115/// struct Request {
116/// method: String,
117/// status: String,
118/// user_agent: String,
119/// url: String
120/// }
121///
122/// #[tokio::main]
123/// async fn main() -> Result<()> {
124/// // Create the communication channel
125/// let (tx, rx) = flume::unbounded();
126///
127/// // A command you want to run.
128/// let mut cmd = Command::new("/usr/bin/nginx");
129/// cmd.arg("-c").arg("/opt/nginx/nginx.conf");
130///
131/// // Your parser which will receive all the outputs and parse them. Return None if you just want to skip the line
132/// let parser = move |line: &str, pid: u32, std_type: &StdType| -> Option<Request> {
133/// // This nginx output is like "GET /index.html 200 Mozilla/5.0"
134/// if line.starts_with("GET") || line.starts_with("POST") {
135/// // I'm interested only on GET and POST requests
136/// let parts = line.split(" ").collect::<Vec<&str>>();
137/// Some(Request {
138/// method: parts[0].to_owned(),
139/// status: parts[2].to_owned(),
140/// user_agent: parts[3].to_owned(),
141/// url: parts[1].to_owned(),
142/// })
143/// } else {
144/// // Other kind of request or any other output that I'm ignoring
145/// None
146/// }
147/// };
148///
149/// // Establish a hive
150/// let (_, mut hive) = run_hive(tx.clone(), parser).await;
151/// // Spawn the nginx command
152/// hive.spawn("my-nginx", cmd).await?;
153///
154/// // I will use this interval to kill the nginx in 15 seconds
155/// let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000));
156/// interval.tick().await;
157///
158/// // Wait for the updates
159/// let mut keep_running = true;
160/// while keep_running {
161/// tokio::select! {
162/// message = rx.recv_async() => {
163/// match message {
164/// Ok(message) => {
165/// // message is any kind of `RunnerEvent`
166/// match message {
167/// RunnerStartEvent(event) => {
168/// println!("Process with id {} started", event.id)
169/// },
170/// RunnerStopEvent(event) => {
171/// println!("Process with pid {} died", event.pid)
172/// },
173/// RunnerStatusEvent(event) => {
174/// println!("New message from {}: {:?}", event.id, event.data)
175/// },
176/// RunnerLogEvent(event) => {
177/// println!("Log of type {:?} from {}: {:?}", event.std, event.id, event.log)
178/// }
179/// }
180/// },
181/// Err(err) => {
182/// println!("ERROR {:?}", err);
183/// keep_running = false;
184/// }
185/// }
186///
187/// },
188/// _ = interval.tick() => {
189/// // List all running processes
190/// let proc_list = hive.processes_info().await;
191/// println("Before die: {:?}", proc_list);
192/// println!("DIE NGINX DIE HAHAHAAH");
193/// hive.halt("my-nginx").await?;
194/// let proc_list = hive.processes_info().await;
195/// println("After die: {:?}", proc_list);
196/// }
197/// }
198/// }
199///
200/// // Kill all running processes before exit
201/// hive.disband().await?;
202/// Ok(())
203/// }
204///```
205pub async fn run_hive<F, T: 'static + std::marker::Send>(client_event_notifier: flume::Sender<RunnerEvent<T>>, f: F) -> (tokio::task::JoinHandle<()>, Hive)
206where F: FnMut(&str, u32, &StdType) -> Option<T> + std::marker::Send + Copy + 'static
207{
208 let mut processes: HashMap<String, flume::Sender<bool>> = HashMap::new();
209 let (termination_notifier, termination_receiver) = flume::unbounded::<String>();
210 let (kill_request_sender, kill_request_receiver) = flume::unbounded::<String>();
211 let (run_request_sender, run_request_receiver) = flume::unbounded::<(String, Command)>();
212 let (disband_sender, disband_receiver) = flume::unbounded();
213
214 let info_proc = Arc::new(Mutex::new(HashMap::new()));
215 let hive_info_proc = info_proc.clone();
216
217 let join_handle = tokio::spawn(async move {
218 let mut run = true;
219 let mut shutting_down = false;
220 while run {
221 tokio::select!(
222 id = termination_receiver.recv_async() => {
223 if let Ok(id) = id {
224 info!("Cleaning process {}", id);
225 processes.remove(&id);
226 info_proc.lock().await.remove(&id);
227
228 if shutting_down {
229 let remaining = processes.len();
230 if remaining == 0 {
231 info!("No processes running, hive disbanded");
232 run = false;
233 } else {
234 info!("{remaining} remaining processes until disband");
235 }
236 }
237 }
238 },
239 id = kill_request_receiver.recv_async() => {
240 if let Ok(id) = id {
241 if let Some(process_killer) = processes.get(&id) {
242 info!("Killing {}", id);
243 if let Err(error) = process_killer.send_async(true).await {
244 error!("Error when killing {:?}", error);
245 }
246 } else {
247 warn!("Trying to kill a missing process {}", id);
248 }
249 }
250
251 },
252 process_data = run_request_receiver.recv_async() => {
253 if let Ok((id, mut cmd)) = process_data {
254 info!("Starting {:?}", cmd);
255 cmd.stderr(Stdio::piped());
256 cmd.stdout(Stdio::piped());
257
258 match cmd.spawn() {
259 Ok(mut child) => {
260 let pid = child.id().unwrap();
261 client_event_notifier.send_async(RunnerEvent::RunnerStartEvent(RunnerStartEvent{
262 success: true,
263 pid,
264 id: id.to_owned()
265 })).await.unwrap();
266 let stderr = child.stderr.take().unwrap();
267 let reader_err = BufReader::new(stderr);
268 let stdout = child.stdout.take().unwrap();
269 let reader_out = BufReader::new(stdout);
270
271 let (stop_sender, stop_receiver) = flume::bounded(1);
272 processes.insert(id.to_owned(), stop_sender);
273 let std_command = cmd.as_std();
274 let args = std_command.get_args().into_iter().map(|arg| arg.to_str().unwrap().to_owned()).collect::<Vec<String>>();
275 let proc_info = ProcInfo {
276 command: std_command.get_program().to_str().unwrap().to_owned(),
277 args,
278 start_time: Utc::now(),
279 pid,
280 };
281 // Create a copy just to make the things easier
282 /*let monitor_proc_info = ProcInfo {
283 command: proc_info.command.to_owned(),
284 args: proc_info.args.to_owned(),
285 pid: proc_info.pid,
286 start_time: proc_info.start_time.to_owned()
287 };*/
288 info_proc.lock().await.insert(id.to_owned(), proc_info);
289 tokio::spawn(monitor_process(stop_receiver, client_event_notifier.clone(), termination_notifier.clone(), id.to_owned(), child, info_proc.clone()));
290 tokio::spawn(std_reader(reader_out, client_event_notifier.clone(),id.to_owned(), pid, StdType::Out, f));
291 tokio::spawn(std_reader(reader_err, client_event_notifier.clone(),id.to_owned(), pid, StdType::Err, f));
292 },
293 Err(err) => error!("{:?}", err)
294 };
295 }
296 },
297 _ = disband_receiver.recv_async() => {
298 info!("Starting hive disband");
299 shutting_down = true;
300 if !processes.is_empty() {
301 for process_killer in processes.values_mut() {
302 if let Err(error) = process_killer.send_async(true).await {
303 error!("Error when killing {:?}", error);
304 }
305 }
306 } else {
307 // No processes pending to kill, we can exit
308 run = false;
309 }
310
311 }
312 );
313 }
314 info!("Hive disbanded");
315 });
316
317 (join_handle, Hive{
318 kill_request_sender,
319 run_request_sender,
320 disband_sender,
321 processes_info: hive_info_proc
322 })
323
324}
325
326/// The place where all of your commands are living
327pub struct Hive {
328 kill_request_sender: flume::Sender<String>,
329 run_request_sender: flume::Sender<(String, Command)>,
330 disband_sender: flume::Sender<bool>,
331 processes_info: Arc<Mutex<HashMap<String, ProcInfo>>>
332}
333
334
335impl Hive {
336 /// Stop, kill, murder... Just when you want to stop a command
337 pub async fn halt(&mut self, id: &str) -> Result<()> {
338 Ok(self.kill_request_sender.send_async(id.to_owned()).await?)
339 }
340 /// Create a new command that will live in the hive and work for you until his death
341 pub async fn spawn(&mut self, id: &str, cmd: Command) -> Result<()> {
342 debug!("Spawn {}", id);
343 Ok(self.run_request_sender.send_async((id.to_string(), cmd)).await?)
344 }
345 /// Disband the swarm. Stop all processes and tasks
346 pub async fn disband(&mut self) -> Result<()> {
347 Ok(self.disband_sender.send_async(true).await?)
348 }
349
350 /// Information of all processes. The returned data is cloned from the
351 /// internal registry so don't call it so often if your process list is huge
352 pub async fn processes_info(&self) -> HashMap<String, ProcInfo> {
353 self.processes_info.lock().await.clone()
354 }
355
356}
357
358/// The stdout and stderr reader. It reads asynchronously line by line and provides to your parser each one.
359pub async fn std_reader<F, T>(mut reader: BufReader<impl tokio::io::AsyncRead + Unpin>, task_sender: flume::Sender<RunnerEvent<T>>, id: String, pid: u32, std_type: StdType, mut f: F)
360where F: FnMut(&str, u32, &StdType) -> Option<T> + std::marker::Send + Copy + 'static
361{
362 debug!("Std reader started");
363 let mut buf = Vec::<u8>::new();
364 let mut log = std::collections::VecDeque::<String>::with_capacity(10);
365 let mut keep_runnig = true;
366 while reader.read_until(b'\n', &mut buf).await.unwrap() != 0 && keep_runnig {
367 let line = String::from_utf8(buf.to_owned()).unwrap();
368 log.push_front(line.to_owned());
369 log.truncate(10);
370 if let Some(m) = f(&line, pid, &std_type) {
371 let event = StatusEvent{
372 id: id.to_owned(),
373 data: m
374 };
375 if let Err(error) = task_sender.send_async(RunnerEvent::RunnerStatusEvent(event)).await {
376 if task_sender.is_disconnected() {
377 error!("Event sender for {} disconnected, closing reader", id);
378 keep_runnig = false;
379 } else {
380 error!("Error when sending event: {:?}", error);
381 }
382 }
383 }
384 buf.clear();
385 }
386 debug!("Std reader closed");
387 debug!("Last lines {:?}", log);
388 if !keep_runnig {
389 warn!("Reader exited because of an error, please check the logs");
390 }
391
392 if let Err(error) = task_sender.send_async(RunnerEvent::RunnerLogEvent(RunnerLogEvent{id: id.to_owned(), log, std: std_type})).await {
393 error!("Cannot send log of {}: {:?}", id, error);
394 }
395}