cdk-ansible 0.3.7

cdk-ansible is a tool to generate Ansible playbooks from Rust code.
Documentation
use crate::{
    l2::{
        deploy::{AppL2, cli::GlobalConfig},
        types::{ExePlayL2, LazyExePlayL2},
    },
    types::StackName,
    utils::{dump_json, json_to_yaml},
};
use anyhow::{Context as _, Result};
use cdk_ansible_core::core::Play;
use clap::Args;
use futures::future::{BoxFuture, FutureExt as _};
use std::{path::PathBuf, process::ExitStatus, sync::Arc};
use thiserror::Error;
use tokio::{fs, process::Command, sync::Semaphore, task::JoinSet};

#[derive(Args, Debug, Clone)]
pub struct Deploy {
    /// The command to run the playbook. This string is parsed by shlex.
    ///
    /// The first argument is the command name, and the rest are arguments.
    ///
    /// The default is `ansible-playbook`.
    ///
    /// Example: If you want to run `uv run ansible-playbook -v some_playbook`, pass `uv run ansible-playbook -v` to [`Deploy::playbook_command`].
    #[arg(
        short = 'c',
        long,
        required = false,
        default_value = "ansible-playbook"
    )]
    pub playbook_command: String,
    /// The maximum number of playbook processes.
    #[arg(short = 'P', long, required = false, default_value = "2")]
    pub max_procs: usize,
    /// Only synthesize playbooks and inventories.
    #[arg(long, exclusive = true, default_value = "false")]
    pub synth: bool,
    /// The stack name to deploy.
    /// If not specified, all stacks will be deployed.
    // TODO: support multiple stacks
    pub stack_name: Option<String>,
}

impl Deploy {
    pub async fn run(self, app: &AppL2, global_config: Arc<GlobalConfig>) -> Result<()> {
        let deploy_config = Arc::new(DeployConfig::new(self)?);
        deploy(app, &global_config, &deploy_config).await?;
        Ok(())
    }
}

#[derive(Debug, Clone)]
struct DeployConfig {
    playbook_command: Vec<String>,
    max_procs: usize,
    synth: bool,
    stack_name: Option<StackName>,
}

impl DeployConfig {
    pub fn new(args: Deploy) -> Result<Self> {
        Ok(Self {
            playbook_command: ::shlex::split(&args.playbook_command)
                .with_context(|| "parsing playbook command")?,
            max_procs: args.max_procs,
            synth: args.synth,
            stack_name: args.stack_name.map(|s| StackName::from(s.as_str())),
        })
    }
}

async fn deploy(
    app: &AppL2,
    global_config: &Arc<GlobalConfig>,
    deploy_config: &Arc<DeployConfig>,
) -> Result<()> {
    let playbook_dir = Arc::new(global_config.playbook_dir.clone());
    let inventory_dir = Arc::new(global_config.inventory_dir.clone());

    // remove playbook_dir and inventory_dir
    if playbook_dir.exists() {
        fs::remove_dir_all(playbook_dir.as_ref().clone()).await?;
    }
    if inventory_dir.exists() {
        fs::remove_dir_all(inventory_dir.as_ref().clone()).await?;
    }

    // Semaphore for limiting the number of concurrent ansible-playbook processes
    let cmd_semaphore = Arc::new(Semaphore::new(deploy_config.max_procs));

    for stack in (deploy_config
        .stack_name
        .as_ref()
        .map(|n| {
            // Get a vec having the specified stack.
            [app.inner
                .stack_container
                .get_stack(n)
                .with_context(|| format!("getting stack: {n}"))]
            .into_iter()
            .collect::<Result<Vec<_>>>()
        })
        .unwrap_or_else(|| Ok(app.inner.stack_container.get_stacks().collect())))?
    {
        recursive_deploy(
            stack.name().to_string().to_lowercase().replace(' ', "_"),
            stack.exe_play().clone(),
            Arc::clone(&playbook_dir),
            Arc::clone(&inventory_dir),
            Arc::clone(deploy_config),
            Arc::clone(&cmd_semaphore),
        )
        .await?;
    }

    Ok(())
}

fn recursive_deploy(
    name: String,
    lazy_exe_play: LazyExePlayL2,
    playbook_dir: Arc<PathBuf>,
    inventory_dir: Arc<PathBuf>,
    deploy_config: Arc<DeployConfig>,
    cmd_semaphore: Arc<Semaphore>,
) -> BoxFuture<'static, std::result::Result<(), DeployL2Error>> {
    async move {
        match lazy_exe_play {
            LazyExePlayL2::Sequential(leps) => {
                for (i, lep) in leps.into_iter().enumerate() {
                    recursive_deploy(
                        format!("{name}_s{i}"),
                        lep,
                        Arc::clone(&playbook_dir),
                        Arc::clone(&inventory_dir),
                        Arc::clone(&deploy_config),
                        Arc::clone(&cmd_semaphore),
                    )
                    .await?;
                }
            }
            LazyExePlayL2::Parallel(leps) => {
                let mut set = JoinSet::new();
                for (i, lep) in leps.into_iter().enumerate() {
                    set.spawn(recursive_deploy(
                        format!("{name}_p{i}"),
                        lep,
                        Arc::clone(&playbook_dir),
                        Arc::clone(&inventory_dir),
                        Arc::clone(&deploy_config),
                        Arc::clone(&cmd_semaphore),
                    ));
                }

                // Wait for all tasks to complete and then check the results.
                let mut results = Vec::new();
                while let Some(res) = set.join_next().await {
                    results.push(res.map_err(|e| DeployL2Error::Other(e.into()))?);
                }
                // combine all error messages
                let mut errors = Vec::new();
                for res in results.into_iter() {
                    if let Err(e) = res {
                        errors.push(e);
                    }
                }
                if !errors.is_empty() {
                    return Err(DeployL2Error::Parallel {
                        errors: errors.into_iter().map(|e| e.into()).collect(),
                    });
                }
            }
            LazyExePlayL2::Single(lp) => {
                let ep = lp.lazy_play_l2().await?;
                deploy_exe_play_l2(
                    name,
                    ep,
                    playbook_dir,
                    inventory_dir,
                    deploy_config,
                    cmd_semaphore,
                )
                .await?;
            }
        }
        Ok(())
    }
    .boxed()
}

fn deploy_exe_play_l2(
    name: String,
    exe_play: ExePlayL2,
    playbook_dir: Arc<PathBuf>,
    inventory_dir: Arc<PathBuf>,
    deploy_config: Arc<DeployConfig>,
    cmd_semaphore: Arc<Semaphore>,
) -> BoxFuture<'static, std::result::Result<(), DeployL2Error>> {
    async move {
        match exe_play {
            ExePlayL2::Sequential(eps) => {
                for (i, ep) in eps.into_iter().enumerate() {
                    deploy_exe_play_l2(
                        format!("{name}_s{i}"),
                        ep,
                        Arc::clone(&playbook_dir),
                        Arc::clone(&inventory_dir),
                        Arc::clone(&deploy_config),
                        Arc::clone(&cmd_semaphore),
                    )
                    .await?;
                }
            }
            ExePlayL2::Parallel(eps) => {
                let mut set = JoinSet::new();
                for (i, ep) in eps.into_iter().enumerate() {
                    set.spawn(deploy_exe_play_l2(
                        format!("{name}_p{i}"),
                        ep,
                        Arc::clone(&playbook_dir),
                        Arc::clone(&inventory_dir),
                        Arc::clone(&deploy_config),
                        Arc::clone(&cmd_semaphore),
                    ));
                }

                // Wait for all tasks to complete and then check the results.
                // This means that, in parallel execution, if one of the tasks fails,
                // the other tasks will continue to run.
                // So, we need to wait for all tasks to complete and then check the results.
                let mut results: Vec<std::result::Result<(), DeployL2Error>> = Vec::new();
                while let Some(res) = set.join_next().await {
                    results.push(res.map_err(|e| DeployL2Error::Other(e.into()))?);
                }
                // combine all error messages
                let mut errors: Vec<DeployL2Error> = Vec::new();
                for res in results.into_iter() {
                    if let Err(e) = res {
                        errors.push(e);
                    }
                }
                if !errors.is_empty() {
                    return Err(DeployL2Error::Parallel {
                        errors: errors.into_iter().map(|e| e.into()).collect(),
                    });
                }
            }
            ExePlayL2::Single(play_l2) => {
                let inv_root = play_l2.hosts.to_inventory_root()?;
                let play = play_l2.try_play()?;

                // Create playbook
                let pb_path_j = playbook_dir.join(&name).with_extension("json");
                dump_json(
                    pb_path_j.clone(),
                    vec![Play {
                        name: format!("{} ({name})", play.name),
                        ..play
                    }],
                )
                .await?;
                json_to_yaml(pb_path_j.clone()).await?;

                // Create inventory
                let inv_path_j = inventory_dir.join(&name).with_extension("json");
                dump_json(inv_path_j.clone(), inv_root).await?;
                json_to_yaml(inv_path_j.clone()).await?;

                if deploy_config.synth {
                    // Only synthesize playbooks and inventories.
                    return Ok(());
                }

                let playbook_cmd_args = deploy_config
                    .playbook_command
                    .clone()
                    .into_iter()
                    .chain([
                        "-i".to_owned(),
                        inv_path_j
                            .with_extension("yaml")
                            .to_string_lossy()
                            .to_string(),
                        pb_path_j
                            .with_extension("yaml")
                            .to_string_lossy()
                            .to_string(),
                    ])
                    .collect::<Vec<_>>();

                let _permit = cmd_semaphore
                    .clone()
                    .acquire_owned()
                    .await
                    .with_context(|| "acquiring semaphore")?;
                let output = Command::new(
                    playbook_cmd_args
                        .first()
                        .with_context(|| "getting 1st playbook command")?,
                )
                .args(playbook_cmd_args.get(1..).unwrap_or_default())
                .output()
                .await
                .with_context(|| {
                    format!(
                        "running ansible-playbook: {}",
                        deploy_config.playbook_command.join(" ")
                    )
                })?;
                if !output.status.success() {
                    return Err(DeployL2Error::Command {
                        command: playbook_cmd_args,
                        stdout: String::from_utf8_lossy(&output.stdout).to_string(),
                        stderr: String::from_utf8_lossy(&output.stderr).to_string(),
                        status: output.status,
                    });
                }
                println!(
                    "stdout for '{}'\n{}",
                    playbook_cmd_args.join(" "),
                    String::from_utf8_lossy(&output.stdout),
                );
                println!(
                    "stderr for '{}'\n{}",
                    playbook_cmd_args.join(" "),
                    String::from_utf8_lossy(&output.stderr),
                );
            }
        }
        Ok(())
    }
    .boxed()
}

#[derive(Error, Debug)]
enum DeployL2Error {
    #[error(
        "failed to run ansible-playbook\n-- command --\n{command:?}\n-- stdout --\n{stdout}\n-- stderr --\n{stderr}\n-- status --\n{status:?}"
    )]
    Command {
        command: Vec<String>,
        stdout: String,
        stderr: String,
        status: ExitStatus,
    },
    #[error(
        "failed to run ansible-playbook in parallel:\n{}",
        errors.iter().map(|e| format!("{e}")).collect::<Vec<_>>().join("\n")
    )]
    Parallel {
        errors: Vec<Box<dyn std::error::Error + Send + Sync>>,
    },
    #[error(transparent)]
    Other(#[from] anyhow::Error),
}