use crate::{
l2::{
deploy::{AppL2, cli::GlobalConfig},
types::{ExePlayL2, LazyExePlayL2},
},
types::StackName,
utils::{dump_json, json_to_yaml},
};
use anyhow::{Context as _, Result};
use cdk_ansible_core::core::Play;
use clap::Args;
use futures::future::{BoxFuture, FutureExt as _};
use std::{path::PathBuf, process::ExitStatus, sync::Arc};
use thiserror::Error;
use tokio::{fs, process::Command, sync::Semaphore, task::JoinSet};
#[derive(Args, Debug, Clone)]
pub struct Deploy {
#[arg(
short = 'c',
long,
required = false,
default_value = "ansible-playbook"
)]
pub playbook_command: String,
#[arg(short = 'P', long, required = false, default_value = "2")]
pub max_procs: usize,
#[arg(long, exclusive = true, default_value = "false")]
pub synth: bool,
pub stack_name: Option<String>,
}
impl Deploy {
pub async fn run(self, app: &AppL2, global_config: Arc<GlobalConfig>) -> Result<()> {
let deploy_config = Arc::new(DeployConfig::new(self)?);
deploy(app, &global_config, &deploy_config).await?;
Ok(())
}
}
#[derive(Debug, Clone)]
struct DeployConfig {
playbook_command: Vec<String>,
max_procs: usize,
synth: bool,
stack_name: Option<StackName>,
}
impl DeployConfig {
pub fn new(args: Deploy) -> Result<Self> {
Ok(Self {
playbook_command: ::shlex::split(&args.playbook_command)
.with_context(|| "parsing playbook command")?,
max_procs: args.max_procs,
synth: args.synth,
stack_name: args.stack_name.map(|s| StackName::from(s.as_str())),
})
}
}
async fn deploy(
app: &AppL2,
global_config: &Arc<GlobalConfig>,
deploy_config: &Arc<DeployConfig>,
) -> Result<()> {
let playbook_dir = Arc::new(global_config.playbook_dir.clone());
let inventory_dir = Arc::new(global_config.inventory_dir.clone());
if playbook_dir.exists() {
fs::remove_dir_all(playbook_dir.as_ref().clone()).await?;
}
if inventory_dir.exists() {
fs::remove_dir_all(inventory_dir.as_ref().clone()).await?;
}
let cmd_semaphore = Arc::new(Semaphore::new(deploy_config.max_procs));
for stack in (deploy_config
.stack_name
.as_ref()
.map(|n| {
[app.inner
.stack_container
.get_stack(n)
.with_context(|| format!("getting stack: {n}"))]
.into_iter()
.collect::<Result<Vec<_>>>()
})
.unwrap_or_else(|| Ok(app.inner.stack_container.get_stacks().collect())))?
{
recursive_deploy(
stack.name().to_string().to_lowercase().replace(' ', "_"),
stack.exe_play().clone(),
Arc::clone(&playbook_dir),
Arc::clone(&inventory_dir),
Arc::clone(deploy_config),
Arc::clone(&cmd_semaphore),
)
.await?;
}
Ok(())
}
fn recursive_deploy(
name: String,
lazy_exe_play: LazyExePlayL2,
playbook_dir: Arc<PathBuf>,
inventory_dir: Arc<PathBuf>,
deploy_config: Arc<DeployConfig>,
cmd_semaphore: Arc<Semaphore>,
) -> BoxFuture<'static, std::result::Result<(), DeployL2Error>> {
async move {
match lazy_exe_play {
LazyExePlayL2::Sequential(leps) => {
for (i, lep) in leps.into_iter().enumerate() {
recursive_deploy(
format!("{name}_s{i}"),
lep,
Arc::clone(&playbook_dir),
Arc::clone(&inventory_dir),
Arc::clone(&deploy_config),
Arc::clone(&cmd_semaphore),
)
.await?;
}
}
LazyExePlayL2::Parallel(leps) => {
let mut set = JoinSet::new();
for (i, lep) in leps.into_iter().enumerate() {
set.spawn(recursive_deploy(
format!("{name}_p{i}"),
lep,
Arc::clone(&playbook_dir),
Arc::clone(&inventory_dir),
Arc::clone(&deploy_config),
Arc::clone(&cmd_semaphore),
));
}
let mut results = Vec::new();
while let Some(res) = set.join_next().await {
results.push(res.map_err(|e| DeployL2Error::Other(e.into()))?);
}
let mut errors = Vec::new();
for res in results.into_iter() {
if let Err(e) = res {
errors.push(e);
}
}
if !errors.is_empty() {
return Err(DeployL2Error::Parallel {
errors: errors.into_iter().map(|e| e.into()).collect(),
});
}
}
LazyExePlayL2::Single(lp) => {
let ep = lp.lazy_play_l2().await?;
deploy_exe_play_l2(
name,
ep,
playbook_dir,
inventory_dir,
deploy_config,
cmd_semaphore,
)
.await?;
}
}
Ok(())
}
.boxed()
}
fn deploy_exe_play_l2(
name: String,
exe_play: ExePlayL2,
playbook_dir: Arc<PathBuf>,
inventory_dir: Arc<PathBuf>,
deploy_config: Arc<DeployConfig>,
cmd_semaphore: Arc<Semaphore>,
) -> BoxFuture<'static, std::result::Result<(), DeployL2Error>> {
async move {
match exe_play {
ExePlayL2::Sequential(eps) => {
for (i, ep) in eps.into_iter().enumerate() {
deploy_exe_play_l2(
format!("{name}_s{i}"),
ep,
Arc::clone(&playbook_dir),
Arc::clone(&inventory_dir),
Arc::clone(&deploy_config),
Arc::clone(&cmd_semaphore),
)
.await?;
}
}
ExePlayL2::Parallel(eps) => {
let mut set = JoinSet::new();
for (i, ep) in eps.into_iter().enumerate() {
set.spawn(deploy_exe_play_l2(
format!("{name}_p{i}"),
ep,
Arc::clone(&playbook_dir),
Arc::clone(&inventory_dir),
Arc::clone(&deploy_config),
Arc::clone(&cmd_semaphore),
));
}
let mut results: Vec<std::result::Result<(), DeployL2Error>> = Vec::new();
while let Some(res) = set.join_next().await {
results.push(res.map_err(|e| DeployL2Error::Other(e.into()))?);
}
let mut errors: Vec<DeployL2Error> = Vec::new();
for res in results.into_iter() {
if let Err(e) = res {
errors.push(e);
}
}
if !errors.is_empty() {
return Err(DeployL2Error::Parallel {
errors: errors.into_iter().map(|e| e.into()).collect(),
});
}
}
ExePlayL2::Single(play_l2) => {
let inv_root = play_l2.hosts.to_inventory_root()?;
let play = play_l2.try_play()?;
let pb_path_j = playbook_dir.join(&name).with_extension("json");
dump_json(
pb_path_j.clone(),
vec![Play {
name: format!("{} ({name})", play.name),
..play
}],
)
.await?;
json_to_yaml(pb_path_j.clone()).await?;
let inv_path_j = inventory_dir.join(&name).with_extension("json");
dump_json(inv_path_j.clone(), inv_root).await?;
json_to_yaml(inv_path_j.clone()).await?;
if deploy_config.synth {
return Ok(());
}
let playbook_cmd_args = deploy_config
.playbook_command
.clone()
.into_iter()
.chain([
"-i".to_owned(),
inv_path_j
.with_extension("yaml")
.to_string_lossy()
.to_string(),
pb_path_j
.with_extension("yaml")
.to_string_lossy()
.to_string(),
])
.collect::<Vec<_>>();
let _permit = cmd_semaphore
.clone()
.acquire_owned()
.await
.with_context(|| "acquiring semaphore")?;
let output = Command::new(
playbook_cmd_args
.first()
.with_context(|| "getting 1st playbook command")?,
)
.args(playbook_cmd_args.get(1..).unwrap_or_default())
.output()
.await
.with_context(|| {
format!(
"running ansible-playbook: {}",
deploy_config.playbook_command.join(" ")
)
})?;
if !output.status.success() {
return Err(DeployL2Error::Command {
command: playbook_cmd_args,
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
status: output.status,
});
}
println!(
"stdout for '{}'\n{}",
playbook_cmd_args.join(" "),
String::from_utf8_lossy(&output.stdout),
);
println!(
"stderr for '{}'\n{}",
playbook_cmd_args.join(" "),
String::from_utf8_lossy(&output.stderr),
);
}
}
Ok(())
}
.boxed()
}
#[derive(Error, Debug)]
enum DeployL2Error {
#[error(
"failed to run ansible-playbook\n-- command --\n{command:?}\n-- stdout --\n{stdout}\n-- stderr --\n{stderr}\n-- status --\n{status:?}"
)]
Command {
command: Vec<String>,
stdout: String,
stderr: String,
status: ExitStatus,
},
#[error(
"failed to run ansible-playbook in parallel:\n{}",
errors.iter().map(|e| format!("{e}")).collect::<Vec<_>>().join("\n")
)]
Parallel {
errors: Vec<Box<dyn std::error::Error + Send + Sync>>,
},
#[error(transparent)]
Other(#[from] anyhow::Error),
}