use anyhow::{Context, bail};
use manta_backend_dispatcher::{
interfaces::{
cfs::CfsTrait,
hsm::{component::ComponentTrait, group::GroupTrait},
},
types::{Group, K8sDetails, cfs::session::CfsSessionGetResponse},
};
use crate::{
common::{
self,
cfs_session_utils::check_cfs_session_against_groups_available,
},
manta_backend_dispatcher::StaticBackendDispatcher,
};
use futures::{AsyncBufReadExt, TryStreamExt};
pub async fn exec(
backend: &StaticBackendDispatcher,
site_name: &str,
token: &str,
shasta_base_url: &str,
shasta_root_cert: &[u8],
hosts_expression: &str,
timestamps: bool,
k8s: &K8sDetails,
) -> Result<(), anyhow::Error> {
let group_available_vec =
backend.get_group_name_available(token).await?;
let node_metadata_available_vec = backend
.get_node_metadata_available(token)
.await
.context("Could not get node metadata")?;
let xname_vec_rslt = common::node_ops::from_hosts_expression_to_xname_vec(
hosts_expression,
false,
node_metadata_available_vec,
)
.await;
let cfs_sessions_vec = match xname_vec_rslt.as_deref() {
Ok([]) | Err(_) => {
tracing::debug!(
"User input is not a node. Checking user input as CFS session name"
);
backend
.get_sessions(
token,
shasta_base_url,
shasta_root_cert,
Some(&hosts_expression.to_string()),
None,
None,
None,
None,
None,
None,
None,
None,
)
.await
}
Ok([xname]) => {
tracing::debug!("User input is a single node");
backend
.get_and_filter_sessions(
token,
shasta_base_url,
shasta_root_cert,
group_available_vec.clone(),
vec![xname],
None,
None,
None,
None,
None,
None,
None,
)
.await
}
Ok([_, ..]) => {
tracing::debug!("User input is a list of nodes");
bail!("Can only operate a single node");
}
}
.context("Could not get CFS sessions")?;
let cfs_session = cfs_sessions_vec.first().ok_or_else(|| {
anyhow::anyhow!("No CFS session found for the given input")
})?;
tracing::info!(
"Get logs for CFS session:\n{}",
crate::cli::output::session::get_table_struct(&cfs_sessions_vec)
);
let cfs_session_backend: CfsSessionGetResponse = cfs_session.clone();
let group_available_vec_group = group_available_vec
.iter()
.map(|g| Group {
label: g.clone(),
description: None,
tags: None,
members: None,
exclusive_group: None,
})
.collect();
check_cfs_session_against_groups_available(
&cfs_session_backend,
group_available_vec_group,
);
print_cfs_session_logs(
backend,
token,
site_name,
cfs_session.name.as_str(),
timestamps,
k8s,
)
.await?;
Ok(())
}
pub async fn print_cfs_session_logs(
backend: &StaticBackendDispatcher,
shasta_token: &str,
site_name: &str,
cfs_session_name: &str,
timestamps: bool,
k8s: &K8sDetails,
) -> Result<(), anyhow::Error> {
let logs_stream = backend
.get_session_logs_stream(
shasta_token,
site_name,
cfs_session_name,
timestamps,
k8s,
)
.await?;
let mut lines = logs_stream.lines();
while let Some(line) =
lines.try_next().await.context("Error reading log stream")?
{
println!("{}", line);
}
Ok(())
}