use crate::{StepLog, WorkflowLogType};
use std::{
path::PathBuf,
process::{ExitStatus, Stdio},
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command as Cmd,
sync::broadcast,
};
#[derive(Debug, Clone)]
pub struct Log {
pub log_type: WorkflowLogType,
pub message: String,
}
#[derive(Debug)]
pub struct Command {
command: Cmd,
log_sender: Option<broadcast::Sender<StepLog>>,
}
impl Command {
pub fn new(cmd: impl Into<String>) -> Self {
if cfg!(target_os = "windows") {
Command::powershell(cmd)
} else {
Command::sh(cmd)
}
}
pub fn powershell(cmd: impl Into<String>) -> Self {
let cmd: String = cmd.into();
let mut command = Cmd::new("powershell.exe");
command
.arg("-NoProfile")
.arg("-NonInteractive")
.arg("-Command")
.arg(cmd);
Command {
command,
log_sender: None,
}
}
pub fn sh(cmd: impl Into<String>) -> Self {
let cmd: String = cmd.into();
let mut command = Cmd::new("sh");
command.arg("-c").arg(cmd);
Command {
command,
log_sender: None,
}
}
pub fn set_log_sender(&mut self, sender: broadcast::Sender<StepLog>) {
self.log_sender = Some(sender);
}
#[allow(unused)]
pub fn env(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
self.command.env(key.into(), value.into());
self
}
#[allow(unused)]
pub fn dir(&mut self, dir: &PathBuf) -> &mut Self {
self.command.current_dir(dir);
self
}
#[allow(unused)]
pub fn arg<S>(&mut self, arg: S) -> &mut Self
where
S: AsRef<std::ffi::OsStr>,
{
self.command.arg(arg);
self
}
#[allow(unused)]
pub async fn exec(&mut self) -> anyhow::Result<String> {
let output = self.command.output().await?;
if output.status.success() {
let stdout = String::from_utf8(output.stdout)?;
return Ok(stdout.trim().to_string());
}
let stderr = String::from_utf8(output.stderr)?;
Err(anyhow::anyhow!(stderr))
}
pub async fn run(&mut self) -> anyhow::Result<ExitStatus> {
let mut child = self
.command
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let out = child
.stdout
.take()
.ok_or(anyhow::anyhow!("Failed to get stdout from child process"))?;
let err = child
.stderr
.take()
.ok_or(anyhow::anyhow!("Failed to get stderr from child process"))?;
let out = BufReader::new(out);
let err = BufReader::new(err);
let mut lines = out.lines();
let mut errors = err.lines();
loop {
tokio::select! {
line = lines.next_line() => {
match line {
Ok(Some(line)) => {
self.send_log(Log {
log_type: WorkflowLogType::Log,
message: line,
});
}
Ok(None) => {
break;
}
Err(err) => {
self.send_log(Log {
log_type: WorkflowLogType::Error,
message: err.to_string(),
});
break;
}
}
}
error = errors.next_line() => {
match error {
Ok(Some(error)) => {
self.send_log(Log {
log_type: WorkflowLogType::Error,
message: error,
});
}
Ok(None) => {
break;
}
Err(err) => {
self.send_log(Log {
log_type: WorkflowLogType::Error,
message: err.to_string(),
});
break;
}
}
}
}
}
let status = child.wait().await?;
Ok(status)
}
fn send_log(&self, log: Log) {
let sender = match &self.log_sender {
Some(sender) => sender,
None => return,
};
let step_log = StepLog {
log_type: log.log_type,
message: log.message,
time: chrono::Utc::now(),
};
if let Err(err) = sender.send(step_log) {
log::error!("Failed to send log: {}", err);
}
}
}