terraform 0.1.0

A wrapper around Terraform cli
Documentation
mod errors;

use std::collections::HashMap;
use std::ffi::OsStr;
use std::io::{BufRead, BufReader, Read};
use std::path::Path;
use std::process::{Child, Command, Stdio};
use std::sync::mpsc::{channel, Sender};
use std::time::{Duration, Instant};

#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;

pub use errors::Error;

pub struct Process<P, Q>
where
    P: AsRef<Path>,
    Q: AsRef<Path>,
{
    binary_path: P,
    working_directory: Q,
    envs: HashMap<String, String>,
    timeout: Duration,
}

impl<P, Q> Process<P, Q>
where
    P: AsRef<Path>,
    Q: AsRef<Path>,
{
    pub fn new(binary_path: P, working_directory: Q, envs: HashMap<String, String>, timeout: Duration) -> Self {
        Self {
            binary_path,
            working_directory,
            envs,
            timeout,
        }
    }

    pub fn spawn<I, S>(&self, args: I) -> Result<ProcessContext, Error>
    where
        I: IntoIterator<Item = S>,
        S: AsRef<OsStr>,
    {
        let mut command = Command::new(self.binary_path.as_ref());
        let command = command
            .current_dir(self.working_directory.as_ref())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .args(args)
            .envs(&self.envs);

        let context = ProcessContext::new(command, self.timeout)?;

        Ok(context)
    }
}

pub struct ProcessContext {
    child: Child,
    start: Instant,
    timeout: Duration,

    pub stdout: Vec<String>,
    pub stderr: Vec<String>,
    pub exit_code: Option<i32>,
    #[cfg(unix)]
    pub signal_code: Option<i32>,
}

impl ProcessContext {
    pub fn new(command: &mut Command, timeout: Duration) -> Result<Self, Error> {
        let start = Instant::now();

        Ok(Self {
            child: command.spawn()?,
            start,
            timeout,
            stdout: Vec::new(),
            stderr: Vec::new(),
            exit_code: None,
            #[cfg(unix)]
            signal_code: None,
        })
    }

    pub fn wait<'a, P, Q>(mut self, mut stdout: P, mut stderr: Q) -> Result<Self, Error>
    where
        P: 'a + FnMut(Option<String>),
        Q: 'a + FnMut(Option<String>),
    {
        let (stdout_tx, stdout_rx) = channel();
        let stdout_processor = StreamProcessor::new(self.child.stdout.take(), stdout_tx);

        let stdout_reader = std::thread::spawn(|| {
            stdout_processor.stream();
        });

        let (stderr_tx, stderr_rx) = channel();
        let stderr_processor = StreamProcessor::new(self.child.stderr.take(), stderr_tx);

        let stderr_reader = std::thread::spawn(|| {
            stderr_processor.stream();
        });

        loop {
            match self.child.try_wait() {
                Err(_) => {
                    let _ = self.child.kill().map(|_| self.child.wait());
                    let _ = stdout_reader.join();
                    let _ = stderr_reader.join();

                    return Err(Error::TimeoutError);
                }
                Ok(Some(status)) => {
                    self.exit_code = status.code();

                    if cfg!(unix) {
                        self.signal_code = status.signal();
                    }

                    let _ = stdout_reader.join();
                    let _ = stderr_reader.join();
                    return Ok(self);
                }
                Ok(None) => {
                    if self.start.elapsed().as_secs() < self.timeout.as_secs() {
                        std::thread::sleep(std::time::Duration::from_millis(20));

                        while let Ok(line) = stdout_rx.try_recv() {
                            if let Ok(line) = line {
                                stdout(Some(line.clone()));
                                self.stdout.push(line);
                            } else {
                                stdout(None);
                                self.stdout.push(String::from("<error retrieving stream content>"));
                            }
                        }

                        while let Ok(line) = stderr_rx.try_recv() {
                            if let Ok(line) = line {
                                stderr(Some(line.clone()));
                                self.stderr.push(line);
                            } else {
                                stderr(None);
                                self.stderr.push(String::from("<error retrieving stream content>"));
                            }
                        }

                        continue;
                    }

                    let _ = self.child.kill().map(|_| self.child.wait());
                    let _ = stdout_reader.join();
                    let _ = stderr_reader.join();
                    return Err(Error::TimeoutError);
                }
            };
        }
    }
}

pub struct StreamProcessor<T>
where
    T: Read,
{
    source: Option<T>,
    sender: Sender<Result<String, Error>>,
}

impl<T> StreamProcessor<T>
where
    T: Read,
{
    pub fn new(source: Option<T>, sender: Sender<Result<String, Error>>) -> Self {
        Self { source, sender }
    }

    fn stream(self) {
        if let Some(source) = self.source {
            for line in BufReader::new(source).lines().enumerate() {
                let (_, line) = line;
                let _ = self.sender.send(line.map_err(|e| Error::IOError(e.to_string())));
            }
        }
    }
}