run_hive

Function run_hive 

Source
pub async fn run_hive<F, T: 'static + Send>(
    client_event_notifier: Sender<RunnerEvent<T>>,
    f: F,
) -> (JoinHandle<()>, Hive)
where F: FnMut(&str, u32, &StdType) -> Option<T> + Send + Copy + 'static,
Expand description

The most important part and the thing that you have to use. It creates an enviroment where your command can live, enjoy and die. The lifecycle. You are the queen of this hive so you decide what commands are created, how many and when they will die. All data that comes from the commands will be sent to the client_event_notifier provided.

 use anyhow::Result;
 use tokio::process::Command;
 use swarm_commander::{run_hive, StdType, RunnerEvent::{RunnerStartEvent, RunnerStopEvent, RunnerStatusEvent, RunnerLogEvent}};
 
 // This is what you parse will build from a command output line
 #[derive(Debug)]
 struct Request {
     method: String,
     status: String,
     user_agent: String,
     url: String
  }
 
 #[tokio::main]
 async fn main() -> Result<()> {
     // Create the communication channel
     let (tx, rx) = flume::unbounded();
 
     // A command you want to run.
     let mut cmd = Command::new("/usr/bin/nginx");
     cmd.arg("-c").arg("/opt/nginx/nginx.conf");
         
     // Your parser which will receive all the outputs and parse them. Return None if you just want to skip the line
     let parser = move |line: &str, pid: u32, std_type: &StdType| -> Option<Request> {
         // This nginx output is like "GET /index.html 200 Mozilla/5.0"
         if line.starts_with("GET") || line.starts_with("POST") {
             // I'm interested only on GET and POST requests
             let parts = line.split(" ").collect::<Vec<&str>>();
             Some(Request {
             method: parts[0].to_owned(),
             status: parts[2].to_owned(),
             user_agent: parts[3].to_owned(),
             url: parts[1].to_owned(),
             })
         } else {
             // Other kind of request or any other output that I'm ignoring
             None
         }
     };
   
     // Establish a hive
     let (_, mut hive) = run_hive(tx.clone(), parser).await;
     // Spawn the nginx command
     hive.spawn("my-nginx", cmd).await?;
   
     // I will use this interval to kill the nginx in 15 seconds
     let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000));
     interval.tick().await;
     
     // Wait for the updates
     let mut keep_running = true;
     while keep_running {
         tokio::select! {
             message = rx.recv_async() => {
                 match message {
                     Ok(message) => {
                         // message is any kind of `RunnerEvent`
                         match message {
                             RunnerStartEvent(event) => {
                                 println!("Process with id {} started", event.id)
                             }, 
                             RunnerStopEvent(event) => {
                                 println!("Process with pid {} died", event.pid)
                             },
                             RunnerStatusEvent(event) => {
                                 println!("New message from {}: {:?}", event.id, event.data)
                             },
                             RunnerLogEvent(event) => {
                                 println!("Log of type {:?} from {}: {:?}", event.std, event.id, event.log)
                             }
                         }
                     },
                     Err(err) => {
                         println!("ERROR {:?}", err);
                         keep_running = false;
                     }
                 }
                 
             },
             _ = interval.tick() => {
                 // List all running processes
                 let proc_list = hive.processes_info().await;
                 println("Before die: {:?}", proc_list);
                 println!("DIE NGINX DIE HAHAHAAH");
                 hive.halt("my-nginx").await?;
                 let proc_list = hive.processes_info().await;
                 println("After die: {:?}", proc_list);
             }
         }
     }
     
     // Kill all running processes before exit
     hive.disband().await?;
     Ok(())
 }