use std::path::PathBuf;
use anyhow::{Context, Error, bail};
use clap::ArgMatches;
use crate::cli::common::{local_git_repo, user_interaction};
use crate::cli::http_client::MantaClient;
use manta_shared::common::app_context::AppContext;
use manta_shared::common::audit;
const GITEA_REPO_NAME_PREFIX: &str = "cray/";
pub async fn exec(
cli_apply_session: &ArgMatches,
ctx: &AppContext<'_>,
token: &str,
) -> Result<(), Error> {
let repo_path_vec: Vec<PathBuf> = cli_apply_session
.get_many("repo-path")
.context("'repo-path' argument not provided")?
.cloned()
.collect();
let hsm_group_name_arg_opt: Option<&str> = cli_apply_session
.get_one::<String>("hsm-group")
.map(String::as_str);
let cfs_conf_sess_name_opt: Option<&String> =
cli_apply_session.get_one("name");
let playbook_file_name_opt: Option<&String> =
cli_apply_session.get_one("playbook-name");
let hsm_group_members_opt: Option<&str> = cli_apply_session
.get_one("ansible-limit")
.map(String::as_str);
let ansible_verbosity: Option<&String> =
cli_apply_session.get_one("ansible-verbosity");
let ansible_passthrough: Option<&String> =
cli_apply_session.get_one("ansible-passthrough");
let watch_logs: bool = cli_apply_session.get_flag("watch-logs");
let timestamps: bool = cli_apply_session.get_flag("timestamps");
let _ = apply_session(
ctx,
token,
cfs_conf_sess_name_opt.map(String::as_str),
playbook_file_name_opt.map(String::as_str),
hsm_group_name_arg_opt,
&repo_path_vec,
hsm_group_members_opt,
ansible_verbosity.map(String::as_str),
ansible_passthrough.map(String::as_str),
watch_logs,
timestamps,
)
.await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn apply_session(
ctx: &AppContext<'_>,
shasta_token: &str,
cfs_conf_sess_name: Option<&str>,
playbook_yaml_file_name_opt: Option<&str>,
hsm_group_opt: Option<&str>,
repos_paths: &[PathBuf],
ansible_limit_opt: Option<&str>,
ansible_verbosity: Option<&str>,
ansible_passthrough: Option<&str>,
watch_logs: bool,
timestamps: bool,
) -> Result<(String, String), Error> {
let server_url = ctx.manta_server_url;
let kafka_audit_opt = ctx.kafka_audit_opt;
let (repo_name_vec, repo_last_commit_id_vec) =
check_local_repos(repos_paths)?;
let (cfs_configuration_name, cfs_session_name) =
MantaClient::new(server_url, ctx.site_name)?
.create_session(
shasta_token,
cfs_conf_sess_name,
playbook_yaml_file_name_opt,
hsm_group_opt,
&repo_name_vec
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>(),
&repo_last_commit_id_vec
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>(),
ansible_limit_opt,
ansible_verbosity,
ansible_passthrough,
)
.await?;
if watch_logs {
tracing::info!("Fetching logs ...");
use tokio::io::AsyncBufReadExt as _;
let client = MantaClient::new(server_url, ctx.site_name)?;
let reader = client
.stream_session_logs(shasta_token, &cfs_session_name, timestamps)
.await
.context("Failed to get CFS session log stream from server")?;
let mut lines = reader.lines();
while let Some(raw) = lines
.next_line()
.await
.context("Failed to read CFS session log stream")?
{
if let Some(content) = raw.strip_prefix("data: ") {
println!("{}", content);
}
}
}
audit::maybe_send_audit(
kafka_audit_opt,
shasta_token,
"Apply session",
Some(serde_json::json!(ansible_limit_opt)),
Some(serde_json::json!(vec![hsm_group_opt])),
)
.await;
Ok((cfs_configuration_name, cfs_session_name))
}
fn check_local_repos(
repos: &[PathBuf],
) -> Result<(Vec<String>, Vec<String>), Error> {
let mut layers_summary = vec![];
let mut repo_name_vec = Vec::new();
let mut repo_last_commit_id_vec = Vec::new();
for (i, repo_path) in repos.iter().enumerate() {
let repo = match local_git_repo::get_repo(&repo_path.to_string_lossy()) {
Ok(repo) => repo,
Err(_) => {
bail!(
"Could not find a git repo in {}",
repo_path.to_string_lossy()
);
}
};
let local_last_commit = local_git_repo::get_last_commit(&repo)
.with_context(|| {
format!(
"Could not get last commit from repo at {}",
repo_path.display()
)
})?;
tracing::info!("Checking local repo status ({})", &repo.path().display());
let all_committed = local_git_repo::untracked_changed_local_files(&repo)
.map_err(|e| Error::msg(e.to_string()))
.with_context(|| {
format!(
"Could not check local repo status at {}",
repo_path.display()
)
})?;
if !all_committed {
if user_interaction::confirm(
"Your local repo has uncommitted changes. Do you want to continue?",
false,
) {
println!(
"Continue. Checking commit id {} against remote",
local_last_commit.id()
);
} else {
bail!("Operation cancelled by user");
}
}
let repo_name_raw = local_git_repo::parse_repo_name_from_remote(&repo)
.with_context(|| {
format!(
"Could not extract repo name from remote in {}",
repo_path.display()
)
})?;
let timestamp = local_last_commit.time().seconds();
let tm = chrono::DateTime::from_timestamp(timestamp, 0)
.context("Could not parse commit timestamp")?;
tracing::debug!(
"\n\nCommit details to apply to CFS layer:\nCommit {}\nAuthor: {}\nDate: {}\n\n {}\n",
local_last_commit.id(),
local_last_commit.author(),
tm,
local_last_commit.message().unwrap_or("no commit message")
);
layers_summary.push(vec![
i.to_string(),
repo_name_raw.to_string(),
all_committed.to_string(),
]);
repo_last_commit_id_vec.push(local_last_commit.id().to_string());
let repo_name = GITEA_REPO_NAME_PREFIX.to_owned() + &repo_name_raw;
repo_name_vec.push(repo_name);
}
println!("Please review the following CFS layers:");
for layer_summary in layers_summary {
let layer_num = layer_summary.first().map(|s| s.as_str()).unwrap_or("?");
let repo_name = layer_summary
.get(1)
.map(|s| s.as_str())
.unwrap_or("unknown");
let committed = layer_summary
.get(2)
.map(|s| s.as_str())
.unwrap_or("unknown");
println!(
" - Layer-{}; repo name: {}; local changes committed: {}",
layer_num, repo_name, committed
);
}
if user_interaction::confirm(
"Please review the layers and its order and confirm if proceed. Do you want to continue?",
false,
) {
println!("Continue. Creating new CFS configuration and layer(s)");
} else {
bail!("Operation cancelled by user");
}
Ok((repo_name_vec, repo_last_commit_id_vec))
}