use anyhow::bail;
use colored::Colorize;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::io::Write;
use std::path::Path;
use tokio::sync::mpsc;
use crate::CTRLC_HANDLER;
use crate::entities::environment::RunEnvironment;
use crate::entities::info::ShortName;
use crate::entities::remote_host::RemoteHost;
use super::variables::Variable;
#[derive(Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Default)]
pub struct CustomCommand {
pub cmd: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub placeholders: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub env: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ignore_fails: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub show_success_output: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub show_cmd: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub only_when_fresh: Option<bool>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub remote_exec: Vec<ShortName>,
#[serde(skip_serializing_if = "Option::is_none")]
pub daemon: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub daemon_wait_seconds: Option<u64>,
}
impl CustomCommand {
pub async fn execute(
&self,
env: &RunEnvironment<'_>,
vars: &BTreeMap<String, Variable>,
) -> anyhow::Result<(bool, Vec<String>)> {
if !self.remote_exec.is_empty() {
return self.remote_execute(env, vars).await;
}
let mut output = vec![];
if !env.new_build && self.only_when_fresh.is_some_and(|v| v) {
if *crate::rw::VERBOSE.wait() {
output.push("Skip a command due to not a fresh run...".to_string());
}
return Ok((true, output));
}
let shell = match std::env::var("DEPLOYER_SH_PATH") {
Ok(path) => path,
Err(_) => "/bin/bash".to_string(),
};
let cmd = self.prepare(env, vars).await?;
let envs = self.fetch_envs(env, vars).await?;
let bash_c_info = format!(r#"{shell} -c "{cmd}""#).green();
let mut process = async_process::Command::new(&shell);
process
.current_dir(env.run_dir)
.arg("-c")
.arg(cmd)
.envs(envs)
.kill_on_drop(true);
let (ptx, prx) = async_io_pipe::async_pipe()?;
process.stdout(ptx.try_clone()?).stderr(ptx);
let mut child = process
.spawn()
.map_err(|e| anyhow::anyhow!("Can't execute command due to: {}", e))?;
if self.daemon.is_some_and(|v| v) {
self.start_daemon(env, child).await?;
if let Some(daemon_wait_seconds) = self.daemon_wait_seconds
&& daemon_wait_seconds > 0
{
tokio::time::sleep(std::time::Duration::from_secs(daemon_wait_seconds)).await;
}
return Ok((true, output));
}
let (dtx, mut drx) = mpsc::unbounded_channel();
let print = self.show_success_output.is_some_and(|v| v);
let show_cmd = self.show_cmd(vars);
let _bash_c_info = bash_c_info.clone();
let io_handle = tokio::spawn(async move {
let mut chunk = [0u8; 2048];
let mut stdout = std::io::stdout();
let mut first_line = true;
let mut last_new_line = true;
loop {
match prx.read(&mut chunk).await {
Ok(0) | Err(_) => break,
Ok(n) => {
if print {
if first_line {
let _ = stdout.write_all(b"\n");
if show_cmd {
let _ = stdout.write_all(format!("Executing `{}`:\n", _bash_c_info).as_bytes());
} else {
let _ = stdout.write_all(b"Executing the command:\n");
}
first_line = false;
}
let _ = stdout.write_all(&prefix_lines(&chunk[..n], b">>> ", &mut last_new_line));
let _ = stdout.flush();
}
let _ = dtx.send(chunk[..n].to_vec());
}
};
}
});
let success = child
.status()
.await
.map_err(|e| anyhow::anyhow!("Can't wait for exit status due to: {}", e))?
.success();
drop(child);
let mut buffer = Vec::with_capacity(8192);
io_handle.abort();
loop {
tokio::select! {
Some(chunk) = drx.recv() => buffer.extend_from_slice(&chunk),
_ = tokio::time::sleep(std::time::Duration::from_millis(20)) => break,
}
}
let cmd_out = String::from_utf8_lossy(&buffer).to_string();
output.extend_from_slice(&crate::utils::compose_output(
bash_c_info.to_string(),
cmd_out,
show_cmd,
));
if !self.ignore_fails.is_some_and(|v| v) && !success {
return Ok((false, output));
}
Ok((true, output))
}
pub async fn execute_observer(
&self,
env: &RunEnvironment<'_>,
vars: &BTreeMap<String, Variable>,
) -> anyhow::Result<(bool, Vec<String>)> {
let shell = match std::env::var("DEPLOYER_SH_PATH") {
Ok(path) => path,
Err(_) => "/bin/bash".to_string(),
};
let cmd = self.prepare(env, vars).await?;
let envs = self.fetch_envs(env, vars).await?;
let mut process = async_process::Command::new(&shell);
process.current_dir(env.run_dir).arg("-c").arg(cmd).envs(envs);
let child = process
.spawn()
.map_err(|e| anyhow::anyhow!("Can't execute command due to: {}", e))?;
{
let mut guard = CTRLC_HANDLER.as_ref().lock().await;
*guard = Some(child);
}
let success = {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
{
let mut guard = CTRLC_HANDLER.as_ref().lock().await;
if let Some(child) = guard.as_mut() {
if child.try_status().is_ok() {
break;
} else {
continue;
}
} else {
return Ok((false, vec!["Can't get child from `CTRLC_HANDLER`!".to_string()]));
}
}
}
let mut guard = CTRLC_HANDLER.as_ref().lock().await;
if let Some(handle) = guard.as_mut() {
handle
.status()
.await
.map_err(|e| anyhow::anyhow!("Can't wait for exit status due to: {}", e))?
.success()
} else {
return Ok((false, vec!["Can't get child from `CTRLC_HANDLER`!".to_string()]));
}
};
if !self.ignore_fails.is_some_and(|v| v) && !success {
return Ok((false, vec![]));
}
Ok((true, vec![]))
}
pub async fn remote_execute<'a>(
&'a self,
env: &'a RunEnvironment<'a>,
vars: &BTreeMap<String, Variable>,
) -> anyhow::Result<(bool, Vec<String>)> {
let hosts = &self.remote_exec;
let mut output = vec![];
let shell = match std::env::var("DEPLOYER_SH_PATH") {
Ok(path) => path,
Err(_) => "/bin/bash".to_string(),
};
if !env.new_build && self.only_when_fresh.is_some_and(|v| v) {
if *crate::rw::VERBOSE.wait() {
output.push("Skip a command due to not a fresh run...".to_string());
}
return Ok((true, output));
}
let globals = crate::rw::read::<crate::globals::DeployerGlobalConfig>(&env.config_dir, crate::GLOBAL_CONF);
let cmd = self.prepare(env, vars).await?;
for hostname in hosts {
output.push(format!("Executing at: `{}`", hostname.as_str().green()));
let remote = match globals.remote_hosts.get(hostname) {
None => {
output.push("There is no such remote host in Deployer's Registry!".to_string());
if !self.ignore_fails.is_some_and(|v| v) {
return Ok((false, output));
}
continue;
}
Some(remote) => remote,
};
let mut session = remote.open_session().await?;
let bash_c_info = format!(r#"{shell} -c "{cmd} && echo $?""#).green();
let (s, out) = remote.exec(cmd.as_str(), &mut session).await?;
let mut composed = crate::utils::compose_output(bash_c_info.to_string(), out, self.show_cmd(vars));
composed.pop();
output.extend_from_slice(&composed);
if !self.ignore_fails.is_some_and(|v| v) && !s {
RemoteHost::close_session(&mut session).await?;
return Ok((false, output));
}
RemoteHost::close_session(&mut session).await?;
}
Ok((true, output))
}
pub async fn start_daemon(&self, env: &RunEnvironment<'_>, child: async_process::Child) -> anyhow::Result<()> {
env.daemons.add_daemon(child).await;
Ok(())
}
pub async fn start_shell(dir: &Path) -> anyhow::Result<()> {
let shell = match std::env::var("DEPLOYER_SH_PATH") {
Ok(path) => path,
Err(_) => "/bin/bash".to_string(),
};
let child = async_process::Command::new(&shell)
.current_dir(dir)
.spawn()
.map_err(|e| anyhow::anyhow!("Can't execute command due to: {}", e))?;
{
let mut guard = CTRLC_HANDLER.as_ref().lock().await;
*guard = Some(child);
}
{
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
{
let mut guard = CTRLC_HANDLER.as_ref().lock().await;
if let Some(child) = guard.as_mut() {
if child.try_status().is_ok() {
break;
} else {
continue;
}
} else {
return Ok(());
}
}
}
let mut guard = CTRLC_HANDLER.as_ref().lock().await;
if let Some(handle) = guard.as_mut() {
let _ = handle
.status()
.await
.map_err(|e| anyhow::anyhow!("Can't wait for exit status due to: {}", e))?
.success();
} else {
return Ok(());
}
}
Ok(())
}
pub async fn prepare(&self, env: &RunEnvironment<'_>, vars: &BTreeMap<String, Variable>) -> anyhow::Result<String> {
let mut cmd = self.cmd.to_owned();
for ph in self.placeholders.iter() {
cmd = cmd.replace(
ph.as_str(),
vars
.iter()
.find(|(k, _)| k.as_str().eq(ph.as_str()))
.ok_or(anyhow::anyhow!("Can't find variable for `{ph}` placeholder!"))?
.1
.get_value(env)
.await?
.as_str(),
);
}
Ok(cmd)
}
pub async fn fetch_envs(
&self,
env: &RunEnvironment<'_>,
vars: &BTreeMap<String, Variable>,
) -> anyhow::Result<Vec<(String, String)>> {
let mut env_map = vec![];
for env_var in self.env.iter().map(|_v| {
vars
.iter()
.find(|(k, _)| k.as_str().eq(_v.as_str()))
.map(|(_, v)| (_v.to_owned(), v.to_owned()))
.ok_or(anyhow::anyhow!("Can't find `{_v}` environment variable!"))
}) {
env_map.push(env_var?);
}
let mut fetched_env_map = Vec::with_capacity(env_map.len());
for (env_name, variable) in env_map {
let value = variable.get_value(env).await?;
fetched_env_map.push((env_name, value));
}
Ok(fetched_env_map)
}
pub fn daemon_cmd(cmd: impl AsRef<str>, daemon: &Option<bool>, daemon_wait_seconds: &Option<u64>) -> Vec<String> {
if daemon.is_some_and(|v| v) {
let mut cmds = vec![format!("{} > /dev/null 2>&1 &", cmd.as_ref())];
if let Some(seconds) = daemon_wait_seconds {
cmds.push(format!("sleep {seconds}"));
}
cmds
} else {
vec![cmd.as_ref().to_owned()]
}
}
pub fn show_cmd(&self, vars: &BTreeMap<String, Variable>) -> bool {
self.show_cmd.is_none_or(|v| v)
&& vars
.iter()
.filter(|(k, _)| self.placeholders.contains(k))
.all(|(_, v)| !v.is_secret)
}
pub fn escape(cmd: impl AsRef<str>) -> String {
let mut escaped = String::new();
for c in cmd.as_ref().chars() {
match c {
'%' => escaped.push_str("%%"),
'"' => escaped.push_str("\\\""),
'\'' => escaped.push_str("'\\''"),
'`' => escaped.push_str("\\`"),
'\n' => escaped.push_str("\\n"),
'\t' => escaped.push_str("\\t"),
'\r' => escaped.push_str("\\r"),
'\\' => escaped.push_str("\\\\"),
_ => escaped.push(c),
}
}
escaped
}
pub fn escape_with_spaces(cmd: impl AsRef<str>) -> String {
let mut escaped = String::new();
for c in cmd.as_ref().chars() {
match c {
' ' => escaped.push_str("\\ "),
'%' => escaped.push_str("%%"),
'"' => escaped.push_str("\\\""),
'\'' => escaped.push_str("'\\''"),
'`' => escaped.push_str("\\`"),
'\n' => escaped.push_str("\\n"),
'\t' => escaped.push_str("\\t"),
'\r' => escaped.push_str("\\r"),
'\\' => escaped.push_str("\\\\"),
_ => escaped.push(c),
}
}
escaped
}
pub fn into_remote_cmds(&self, prepared_cmd: String, env: &RunEnvironment<'_>) -> anyhow::Result<Vec<String>> {
let mut remote_cmds = vec![];
let hosts = &self.remote_exec;
if hosts.is_empty() {
return Ok(vec![prepared_cmd]);
}
for host in hosts {
if let Some((_, host_info)) = env.remotes.iter().find(|r| r.0.eq(host)) {
let remote_prefix = format!("ssh -p {} {}@{}", host_info.port, host_info.username, host_info.ip);
remote_cmds.push(format!("{remote_prefix} \"{}\"", Self::escape(&prepared_cmd)));
} else {
bail!("{host}: There is no such remote host in Deployer's Registry!");
}
}
Ok(remote_cmds)
}
pub async fn run_simple(env: &RunEnvironment<'_>, cmd: impl ToString) -> anyhow::Result<Vec<String>> {
let cmd = cmd.to_string();
let mut output = vec![];
let shell = match std::env::var("DEPLOYER_SH_PATH") {
Ok(path) => path,
Err(_) => "/bin/bash".to_string(),
};
let mut process = async_process::Command::new(&shell);
process
.current_dir(env.run_dir)
.arg("-c")
.arg(cmd.clone())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
let child = process
.spawn()
.map_err(|e| anyhow::anyhow!("Can't execute command due to: {}", e))?;
let command_output = child
.output()
.await
.map_err(|e| anyhow::anyhow!("Can't wait for output due to: {}", e))?;
let stdout_strs = String::from_utf8_lossy(&command_output.stdout).to_string();
let stderr_strs = String::from_utf8_lossy(&command_output.stderr).to_string();
output.extend(stdout_strs.split('\n').map(|s| s.to_string()));
output.extend(stderr_strs.split('\n').map(|s| s.to_string()));
if !command_output.status.success() {
anyhow::bail!("Get an error during execution of: `{}`", cmd);
}
Ok(output)
}
pub async fn run_simple_observer(env: &RunEnvironment<'_>, cmd: impl ToString) -> anyhow::Result<()> {
use crate::bmap;
use crate::rw::log;
let cmd = cmd.to_string();
if !(CustomCommand {
cmd: {
log(format!("Given command: `{cmd}`"));
cmd.to_owned()
},
ignore_fails: None,
only_when_fresh: None,
placeholders: Default::default(),
env: Default::default(),
remote_exec: Default::default(),
show_cmd: None,
show_success_output: Some(true),
daemon: None,
daemon_wait_seconds: None,
})
.execute_observer(
&RunEnvironment {
daemons: env.daemons.clone(),
skipper: env.skipper.clone(),
restart_requested: env.restart_requested.clone(),
..(*env)
},
&bmap!(),
)
.await?
.0
{
anyhow::bail!("Get an error during execution of: `{}`", cmd);
}
Ok(())
}
pub async fn to_shell_raw(
&self,
env: &RunEnvironment<'_>,
vars: &BTreeMap<String, Variable>,
) -> anyhow::Result<Vec<String>> {
self.into_remote_cmds(self.prepare(env, vars).await?, env)
}
pub async fn to_shell(
&self,
env: &RunEnvironment<'_>,
vars: &BTreeMap<String, Variable>,
) -> anyhow::Result<Vec<String>> {
let mut instructions = vec![];
if self.ignore_fails.is_some_and(|v| v) {
instructions.push("set +e".to_string());
}
let variants = self.into_remote_cmds(self.prepare(env, vars).await?, env)?;
let envs = self.fetch_envs(env, vars).await?;
let show_cmd = self.show_cmd(vars);
if show_cmd {
let info = if !env.ansible_run {
format!(
"printf 'Executing `%b{}%b`...\\n' \"$GREEN\" \"$RESET\"",
Self::escape(self.cmd.as_str()),
)
} else {
format!("printf 'Executing `{}`...\\n'", Self::escape(self.cmd.as_str()))
};
instructions.push(info);
}
envs.iter().for_each(|(k, v)| instructions.push(format!("{k}={v}")));
instructions.extend_from_slice(&variants);
envs.iter().for_each(|(k, _)| instructions.push(format!("unset {k}")));
if self.ignore_fails.is_some_and(|v| v) {
instructions.push("set -e".to_string());
}
Ok(instructions)
}
}
fn prefix_lines(data: &[u8], prefix: &[u8], last_new_line: &mut bool) -> Vec<u8> {
let mut result = Vec::with_capacity(data.len());
let mut at_line_start = *last_new_line;
for &byte in data {
if at_line_start && byte != b'\n' {
result.extend_from_slice(prefix);
at_line_start = false;
}
result.push(byte);
if byte == b'\n' {
at_line_start = true;
*last_new_line = true;
} else {
*last_new_line = false;
}
}
result
}