manta-cli 2.0.0-beta.7

Another CLI for ALPS
//! Implements the `manta apply session` command.

use std::path::PathBuf;

use anyhow::{Context, Error, bail};
use clap::ArgMatches;

use crate::cli::common::{local_git_repo, user_interaction};
use crate::cli::http_client::MantaClient;
use manta_shared::common::app_context::AppContext;
use manta_shared::common::audit;

/// Gitea repository name prefix used by CFS.
const GITEA_REPO_NAME_PREFIX: &str = "cray/";

/// Create and run a CFS session on target nodes.
///
/// Authorization (target HSM group access + every xname in
/// `--ansible-limit` belonging to an accessible group) is enforced
/// server-side by `POST /api/v1/sessions`.
pub async fn exec(
  cli_apply_session: &ArgMatches,
  ctx: &AppContext<'_>,
  token: &str,
) -> Result<(), Error> {
  let repo_path_vec: Vec<PathBuf> = cli_apply_session
    .get_many("repo-path")
    .context("'repo-path' argument not provided")?
    .cloned()
    .collect();

  let hsm_group_name_arg_opt: Option<&str> = cli_apply_session
    .get_one::<String>("hsm-group")
    .map(String::as_str);

  let cfs_conf_sess_name_opt: Option<&String> =
    cli_apply_session.get_one("name");
  let playbook_file_name_opt: Option<&String> =
    cli_apply_session.get_one("playbook-name");

  let hsm_group_members_opt: Option<&str> = cli_apply_session
    .get_one("ansible-limit")
    .map(String::as_str);
  let ansible_verbosity: Option<&String> =
    cli_apply_session.get_one("ansible-verbosity");

  let ansible_passthrough: Option<&String> =
    cli_apply_session.get_one("ansible-passthrough");

  let watch_logs: bool = cli_apply_session.get_flag("watch-logs");
  let timestamps: bool = cli_apply_session.get_flag("timestamps");

  let _ = apply_session(
    ctx,
    token,
    cfs_conf_sess_name_opt.map(String::as_str),
    playbook_file_name_opt.map(String::as_str),
    hsm_group_name_arg_opt,
    &repo_path_vec,
    hsm_group_members_opt,
    ansible_verbosity.map(String::as_str),
    ansible_passthrough.map(String::as_str),
    watch_logs,
    timestamps,
  )
  .await?;

  Ok(())
}

/// Creates a CFS session target dynamic
/// Returns a tuple like
/// (<cfs configuration name>, <cfs session name>)
#[allow(clippy::too_many_arguments)]
async fn apply_session(
  ctx: &AppContext<'_>,
  shasta_token: &str,
  cfs_conf_sess_name: Option<&str>,
  playbook_yaml_file_name_opt: Option<&str>,
  hsm_group_opt: Option<&str>,
  repos_paths: &[PathBuf],
  ansible_limit_opt: Option<&str>,
  ansible_verbosity: Option<&str>,
  ansible_passthrough: Option<&str>,
  watch_logs: bool,
  timestamps: bool,
) -> Result<(String, String), Error> {
  let server_url = ctx.manta_server_url;
  let kafka_audit_opt = ctx.kafka_audit_opt;

  // Check local repos (user interaction: confirm dialogs)
  let (repo_name_vec, repo_last_commit_id_vec) =
    check_local_repos(repos_paths)?;

  // Create CFS session via server
  let (cfs_configuration_name, cfs_session_name) =
    MantaClient::new(server_url, ctx.site_name)?
      .create_session(
        shasta_token,
        cfs_conf_sess_name,
        playbook_yaml_file_name_opt,
        hsm_group_opt,
        &repo_name_vec
          .iter()
          .map(|s| s.as_str())
          .collect::<Vec<&str>>(),
        &repo_last_commit_id_vec
          .iter()
          .map(|s| s.as_str())
          .collect::<Vec<&str>>(),
        ansible_limit_opt,
        ansible_verbosity,
        ansible_passthrough,
      )
      .await?;

  // Watch logs (CLI concern: println)
  if watch_logs {
    tracing::info!("Fetching logs ...");

    use tokio::io::AsyncBufReadExt as _;
    let client = MantaClient::new(server_url, ctx.site_name)?;
    let reader = client
      .stream_session_logs(shasta_token, &cfs_session_name, timestamps)
      .await
      .context("Failed to get CFS session log stream from server")?;
    let mut lines = reader.lines();
    while let Some(raw) = lines
      .next_line()
      .await
      .context("Failed to read CFS session log stream")?
    {
      if let Some(content) = raw.strip_prefix("data: ") {
        println!("{}", content);
      }
    }
  }

  // Audit
  audit::maybe_send_audit(
    kafka_audit_opt,
    shasta_token,
    "Apply session",
    Some(serde_json::json!(ansible_limit_opt)),
    Some(serde_json::json!(vec![hsm_group_opt])),
  )
  .await;

  Ok((cfs_configuration_name, cfs_session_name))
}

fn check_local_repos(
  repos: &[PathBuf],
) -> Result<(Vec<String>, Vec<String>), Error> {
  let mut layers_summary = vec![];
  let mut repo_name_vec = Vec::new();
  let mut repo_last_commit_id_vec = Vec::new();

  for (i, repo_path) in repos.iter().enumerate() {
    let repo = match local_git_repo::get_repo(&repo_path.to_string_lossy()) {
      Ok(repo) => repo,
      Err(_) => {
        bail!(
          "Could not find a git repo in {}",
          repo_path.to_string_lossy()
        );
      }
    };

    let local_last_commit = local_git_repo::get_last_commit(&repo)
      .with_context(|| {
        format!(
          "Could not get last commit from repo at {}",
          repo_path.display()
        )
      })?;

    tracing::info!("Checking local repo status ({})", &repo.path().display());

    let all_committed = local_git_repo::untracked_changed_local_files(&repo)
      .map_err(|e| Error::msg(e.to_string()))
      .with_context(|| {
        format!(
          "Could not check local repo status at {}",
          repo_path.display()
        )
      })?;

    if !all_committed {
      if user_interaction::confirm(
        "Your local repo has uncommitted changes. Do you want to continue?",
        false,
      ) {
        println!(
          "Continue. Checking commit id {} against remote",
          local_last_commit.id()
        );
      } else {
        bail!("Operation cancelled by user");
      }
    }

    let repo_name_raw = local_git_repo::parse_repo_name_from_remote(&repo)
      .with_context(|| {
        format!(
          "Could not extract repo name from remote in {}",
          repo_path.display()
        )
      })?;

    let timestamp = local_last_commit.time().seconds();
    let tm = chrono::DateTime::from_timestamp(timestamp, 0)
      .context("Could not parse commit timestamp")?;

    tracing::debug!(
      "\n\nCommit details to apply to CFS layer:\nCommit  {}\nAuthor: {}\nDate:   {}\n\n    {}\n",
      local_last_commit.id(),
      local_last_commit.author(),
      tm,
      local_last_commit.message().unwrap_or("no commit message")
    );

    layers_summary.push(vec![
      i.to_string(),
      repo_name_raw.to_string(),
      all_committed.to_string(),
    ]);

    repo_last_commit_id_vec.push(local_last_commit.id().to_string());

    let repo_name = GITEA_REPO_NAME_PREFIX.to_owned() + &repo_name_raw;
    repo_name_vec.push(repo_name);
  }

  // Print CFS session/configuration layers summary
  println!("Please review the following CFS layers:");
  for layer_summary in layers_summary {
    let layer_num = layer_summary.first().map(|s| s.as_str()).unwrap_or("?");
    let repo_name = layer_summary
      .get(1)
      .map(|s| s.as_str())
      .unwrap_or("unknown");
    let committed = layer_summary
      .get(2)
      .map(|s| s.as_str())
      .unwrap_or("unknown");
    println!(
      " - Layer-{}; repo name: {}; local changes committed: {}",
      layer_num, repo_name, committed
    );
  }

  if user_interaction::confirm(
    "Please review the layers and its order and confirm if proceed. Do you want to continue?",
    false,
  ) {
    println!("Continue. Creating new CFS configuration and layer(s)");
  } else {
    bail!("Operation cancelled by user");
  }

  Ok((repo_name_vec, repo_last_commit_id_vec))
}