1use std::error::Error;
5use std::ffi::OsStr;
6use std::marker;
7use std::process::Stdio;
8
9use async_trait::async_trait;
10use log;
11use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
12use tokio::process::Child;
13use tokio::sync::mpsc::{Receiver, Sender};
14use tokio::time::Duration;
15
16#[derive(Debug)]
20pub enum LogType {
21 Info,
22 Error,
23}
24
25pub trait ProcessStatus<T, E>
29where
30 E: Error + Send,
31 Self: Send,
32{
33 fn status_entry(&self) -> T;
35 fn status_exit(&self) -> T;
37 fn error_type(&self) -> E;
39 fn wrap_error<F: Error + Sync + Send + 'static>(&self, error: F, message: Option<String>) -> E;
41}
42
43#[derive(Debug)]
47pub struct LogOutputData {
48 line: String,
49 log_type: LogType,
50}
51
52#[async_trait]
56pub trait AsyncCommand<S, E, P>
57where
58 E: Error + Send,
59 P: ProcessStatus<S, E> + Send,
60 Self: Sized,
61{
62 fn new<A, B>(executable_path: &OsStr, args: A, process_type: P) -> Result<Self, E>
66 where
67 A: IntoIterator<Item = B>,
68 B: AsRef<OsStr>;
69 async fn execute(&mut self, timeout: Option<Duration>) -> Result<S, E>;
76}
77
78pub struct AsyncCommandExecutor<S, E, P>
82where
83 S: Send,
84 E: Error + Send,
85 P: ProcessStatus<S, E>,
86 Self: Send,
87{
88 command: tokio::process::Command,
90 process: Child,
92 process_type: P,
94 _marker_s: marker::PhantomData<S>,
95 _marker_e: marker::PhantomData<E>,
96}
97
98impl<S, E, P> AsyncCommandExecutor<S, E, P>
99where
100 S: Send,
101 E: Error + Send,
102 P: ProcessStatus<S, E> + Send,
103{
104 fn init(command: &mut tokio::process::Command, process_type: &P) -> Result<Child, E> {
106 command
107 .stdout(Stdio::piped())
108 .stderr(Stdio::piped())
109 .spawn()
110 .map_err(|_| process_type.error_type())
111 }
112
113 fn generate_command<A, B>(executable_path: &OsStr, args: A) -> tokio::process::Command
115 where
116 A: IntoIterator<Item = B>,
117 B: AsRef<OsStr>,
118 {
119 let mut command = tokio::process::Command::new(executable_path);
120 command.args(args);
121 command
122 }
123
124 async fn handle_output<R: AsyncRead + Unpin>(data: R, sender: Sender<LogOutputData>) -> () {
126 let mut lines = BufReader::new(data).lines();
127 while let Some(line) = lines.next_line().await.expect("error handling output") {
128 let io_data = LogOutputData {
129 line,
130 log_type: LogType::Info,
131 };
132 sender
133 .send(io_data)
134 .await
135 .expect("error sending log output data");
136 }
137 }
138
139 async fn log_output(mut receiver: Receiver<LogOutputData>) -> () {
141 while let Some(data) = receiver.recv().await {
142 match data.log_type {
143 LogType::Info => {
144 log::info!("{}", data.line);
145 }
146 LogType::Error => {
147 log::error!("{}", data.line);
148 }
149 }
150 }
151 }
152
153 async fn run_process(&mut self) -> Result<S, E> {
155 let exit_status = self
156 .process
157 .wait()
158 .await
159 .map_err(|e| self.process_type.wrap_error(e, None))?;
160 if exit_status.success() {
161 Ok(self.process_type.status_exit())
162 } else {
163 Err(self.process_type.error_type())
164 }
165 }
166
167 #[cfg(not(target_os = "windows"))]
168 async fn command_execution(&mut self) -> Result<S, E> {
169 let (sender, receiver) = tokio::sync::mpsc::channel::<LogOutputData>(1000);
170 let res = self.run_process().await;
171 let stdout = self.process.stdout.take().unwrap();
172 let stderr = self.process.stderr.take().unwrap();
173 let tx = sender.clone();
174 let _ = tokio::task::spawn(async { Self::handle_output(stdout, tx).await });
175 let _ = tokio::task::spawn(async { Self::handle_output(stderr, sender).await });
176 let _ = tokio::task::spawn(async { Self::log_output(receiver).await });
177 res
178 }
179
180 #[cfg(target_os = "windows")]
181 async fn command_execution(&mut self) -> Result<S, E> {
182 let res = self.run_process().await;
185 res
192 }
193
194}
195
196#[async_trait]
197impl<S, E, P> AsyncCommand<S, E, P> for AsyncCommandExecutor<S, E, P>
198where
199 S: Send,
200 E: Error + Send,
201 P: ProcessStatus<S, E> + Send,
202{
203 fn new<A, B>(executable_path: &OsStr, args: A, process_type: P) -> Result<Self, E>
204 where
205 A: IntoIterator<Item = B>,
206 B: AsRef<OsStr>,
207 {
208 let mut command = Self::generate_command(executable_path, args);
209 let process = Self::init(&mut command, &process_type)?;
210 Ok(AsyncCommandExecutor {
211 command,
212 process,
213 process_type,
214 _marker_s: Default::default(),
215 _marker_e: Default::default(),
216 })
217 }
218
219 async fn execute(&mut self, timeout: Option<Duration>) -> Result<S, E> {
220 match timeout {
221 None => self.command_execution().await,
222 Some(duration) => tokio::time::timeout(duration, self.command_execution())
223 .await
224 .map_err(|e| {
225 self.process_type
226 .wrap_error(e, Some(String::from("timed out")))
227 })?,
228 }
229 }
230}