terraform/process/
mod.rs

1mod errors;
2
3use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::io::{BufRead, BufReader, Read};
6use std::path::Path;
7use std::process::{Child, Command, Stdio};
8use std::sync::mpsc::{channel, Sender};
9use std::time::{Duration, Instant};
10
11#[cfg(unix)]
12use std::os::unix::process::ExitStatusExt;
13
14pub use errors::Error;
15
16pub struct Process<P, Q>
17where
18    P: AsRef<Path>,
19    Q: AsRef<Path>,
20{
21    binary_path: P,
22    working_directory: Q,
23    envs: HashMap<String, String>,
24    timeout: Duration,
25}
26
27impl<P, Q> Process<P, Q>
28where
29    P: AsRef<Path>,
30    Q: AsRef<Path>,
31{
32    pub fn new(binary_path: P, working_directory: Q, envs: HashMap<String, String>, timeout: Duration) -> Self {
33        Self {
34            binary_path,
35            working_directory,
36            envs,
37            timeout,
38        }
39    }
40
41    pub fn spawn<I, S>(&self, args: I) -> Result<ProcessContext, Error>
42    where
43        I: IntoIterator<Item = S>,
44        S: AsRef<OsStr>,
45    {
46        let mut command = Command::new(self.binary_path.as_ref());
47        let command = command
48            .current_dir(self.working_directory.as_ref())
49            .stdout(Stdio::piped())
50            .stderr(Stdio::piped())
51            .args(args)
52            .envs(&self.envs);
53
54        let context = ProcessContext::new(command, self.timeout)?;
55
56        Ok(context)
57    }
58}
59
60pub struct ProcessContext {
61    child: Child,
62    start: Instant,
63    timeout: Duration,
64
65    pub stdout: Vec<String>,
66    pub stderr: Vec<String>,
67    pub exit_code: Option<i32>,
68    #[cfg(unix)]
69    pub signal_code: Option<i32>,
70}
71
72impl ProcessContext {
73    pub fn new(command: &mut Command, timeout: Duration) -> Result<Self, Error> {
74        let start = Instant::now();
75
76        Ok(Self {
77            child: command.spawn()?,
78            start,
79            timeout,
80            stdout: Vec::new(),
81            stderr: Vec::new(),
82            exit_code: None,
83            #[cfg(unix)]
84            signal_code: None,
85        })
86    }
87
88    pub fn wait<'a, P, Q>(mut self, mut stdout: P, mut stderr: Q) -> Result<Self, Error>
89    where
90        P: 'a + FnMut(Option<String>),
91        Q: 'a + FnMut(Option<String>),
92    {
93        let (stdout_tx, stdout_rx) = channel();
94        let stdout_processor = StreamProcessor::new(self.child.stdout.take(), stdout_tx);
95
96        let stdout_reader = std::thread::spawn(|| {
97            stdout_processor.stream();
98        });
99
100        let (stderr_tx, stderr_rx) = channel();
101        let stderr_processor = StreamProcessor::new(self.child.stderr.take(), stderr_tx);
102
103        let stderr_reader = std::thread::spawn(|| {
104            stderr_processor.stream();
105        });
106
107        loop {
108            match self.child.try_wait() {
109                Err(_) => {
110                    let _ = self.child.kill().map(|_| self.child.wait());
111                    let _ = stdout_reader.join();
112                    let _ = stderr_reader.join();
113
114                    return Err(Error::TimeoutError);
115                }
116                Ok(Some(status)) => {
117                    self.exit_code = status.code();
118
119                    if cfg!(unix) {
120                        self.signal_code = status.signal();
121                    }
122
123                    let _ = stdout_reader.join();
124                    let _ = stderr_reader.join();
125                    return Ok(self);
126                }
127                Ok(None) => {
128                    if self.start.elapsed().as_secs() < self.timeout.as_secs() {
129                        std::thread::sleep(std::time::Duration::from_millis(20));
130
131                        while let Ok(line) = stdout_rx.try_recv() {
132                            if let Ok(line) = line {
133                                stdout(Some(line.clone()));
134                                self.stdout.push(line);
135                            } else {
136                                stdout(None);
137                                self.stdout.push(String::from("<error retrieving stream content>"));
138                            }
139                        }
140
141                        while let Ok(line) = stderr_rx.try_recv() {
142                            if let Ok(line) = line {
143                                stderr(Some(line.clone()));
144                                self.stderr.push(line);
145                            } else {
146                                stderr(None);
147                                self.stderr.push(String::from("<error retrieving stream content>"));
148                            }
149                        }
150
151                        continue;
152                    }
153
154                    let _ = self.child.kill().map(|_| self.child.wait());
155                    let _ = stdout_reader.join();
156                    let _ = stderr_reader.join();
157                    return Err(Error::TimeoutError);
158                }
159            };
160        }
161    }
162}
163
164pub struct StreamProcessor<T>
165where
166    T: Read,
167{
168    source: Option<T>,
169    sender: Sender<Result<String, Error>>,
170}
171
172impl<T> StreamProcessor<T>
173where
174    T: Read,
175{
176    pub fn new(source: Option<T>, sender: Sender<Result<String, Error>>) -> Self {
177        Self { source, sender }
178    }
179
180    fn stream(self) {
181        if let Some(source) = self.source {
182            for line in BufReader::new(source).lines().enumerate() {
183                let (_, line) = line;
184                let _ = self.sender.send(line.map_err(|e| Error::IOError(e.to_string())));
185            }
186        }
187    }
188}