1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
extern crate time;
extern crate rustc_serialize;

use std::io::prelude::*;
use std::os::unix::prelude::*;
use std::io::BufReader;

use std::process::ExitStatus;
use std::io::Error;

use std::fs::File;
use std::fs::create_dir_all;
use std::path::Path;
use std::thread;
use std::convert::AsRef;
use std::process::{Command, Stdio};
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender, channel};

use rustc_serialize::json;

use data_format::{LogSource, TimestampedLine};

pub struct Writer;

impl Writer {
  fn run(&self, destination_file:String,  receiver: Receiver<HashMap<String, String>> ) {
  let dfs: &str = destination_file.as_ref();
  let destination = Path::new(dfs);
  create_dir_all(destination.parent().unwrap()).ok();
  println!("Writing to {}.", destination_file);
  match File::create(destination) {
    Ok(mut file) => {
    let mut stop = 3;
    while stop != 0 {
      match receiver.recv() {
      Ok(tl) => {
        // This is a stop command, decrement counter. We need to wait for main, stderr, stdout before stopping
        if tl.get("source") == Some(&LogSource::ControlSystem.to_string()) && tl.get("content") == Some(&"stop".to_string()) {
          stop = stop - 1;
        } else {
        let encoded = json::encode(&tl).unwrap();
        file.write_all(&encoded.into_bytes()).ok();
        file.write_all(b"\n").ok();
        }
      }
      Err(e) => println!("Receive error: {}", e),
      }
    }
    },
    Err(e) => panic!("Unable to open output file {}: {}", destination_file, e.to_string())
  }
  }
}

pub struct Runner;

impl Runner {
  pub fn run(&self, cmd: &str, parms: Vec<String>, log_root_directory: String, process_name: String, pwd: Option<String>) -> Result<ExitStatus, Error> {
    //https://github.com/rust-lang/rust/blob/b83b26bacb6371173cdec6bf68c7ffa69f858c84/src/libstd/process.rs
    fn read_timestamped_lines<T: Read + Send + 'static>(stream: Option<T>, source: LogSource, sender: Sender<HashMap<String, String>>) {
      match stream {
      Some(stream) => {
        thread::spawn(move || {
        let mut br = BufReader::with_capacity(64, stream);
        while {
          let mut line = String::new();
          let ok = match br.read_line(&mut line) {
          Ok(0) => false,
          Ok(_) => true,
          Err(e) => {println!("Something went wrong while reading the data: {}", e.to_string()); false}
          };
          if ok {
          let now = time::now();
          sender.send(TimestampedLine::tsl(source.clone(), now, line[0..line.len()-1].to_string())).unwrap();
          }
          ok
        } {}
        sender.send(TimestampedLine::stop_writer()).unwrap();
        });
      }
      None => sender.send(TimestampedLine::msg("Stream is None, not reading anything.".to_string())).unwrap()
      }
    }

    let start = time::now();

    let cmd_path = Path::new(cmd);
    let fname = match cmd_path.file_name() {
      Some(f) => {
      let mut pth = time::strftime(&format!("{}/{}/%Y/%m/%d/", log_root_directory, process_name), &start).ok().unwrap();
      let postfix = time::strftime("-%T.ajson", &start).ok().unwrap();
      let osname = f.to_string_lossy().into_owned();
      pth.push_str(&osname);
      pth.push_str(&postfix);
      pth
      },
      None => panic!("Unable to find file name for {}", cmd),
    };

    let (sender, receiver) = channel();
    let writer = Writer;
    let th = thread::spawn(move || {
      writer.run(fname, receiver);
    });
    let current_dir = pwd.unwrap_or(String::from("."));

    sender.send(TimestampedLine::msg(format!("Processing {} in folder {}", cmd, current_dir))).unwrap();

    let mut child = match Command::new(&cmd)
      .args(parms.as_ref())
      .current_dir(current_dir)
      .stdin(Stdio::null())
      .stdout(Stdio::piped())
      .stderr(Stdio::piped())
      .spawn() {
      Err(why) => panic!("couldn't spawn {}: {}", &cmd, why.to_string()),
      Ok(child) => child,
    };


    read_timestamped_lines(child.stdout.take(), LogSource::StdOut, sender.clone());
    read_timestamped_lines(child.stderr.take(), LogSource::StdErr, sender.clone());
    let status = child.wait();
    let end = time::now();

    let duration = (end-start).num_milliseconds().to_string();

    let outcome = match status {
    Ok(es) => if es.success() {
      let mut msg = TimestampedLine::msg("Process completed successfully.".to_string());
      msg.insert("completed".to_string(), "0".to_string());
      (msg, Ok(es))
      } else if let Some(ecode) = es.code() {
      (TimestampedLine::msg(format!("Process completed with error code {}.", ecode)), Ok(es))
      } else if let Some(esignal) = es.signal() {
      (TimestampedLine::msg(format!("Process aborted with signal {}.", esignal)), Ok(es))
      } else {
      (TimestampedLine::msg("Non-reachable condition reached. Something's wrong".to_string()), Ok(es))
      },
      Err(run_error) => (TimestampedLine::msg(format!("Something went wrong while getting the command output: {:?}", run_error)), Err(run_error)),
    };

    let (mut end_line, return_status) = outcome;

    end_line.insert("duration".to_string(), duration);
    sender.send(end_line).unwrap();

    sender.send(TimestampedLine::stop_writer()).unwrap();
    // Wait for writer thread to complete its writes
    let _ = th.join();
    return return_status;
  }
}