use anyhow::{Context, bail};
use futures::TryStreamExt;
use serde_json::Value;
use tokio::io::{AsyncBufRead, BufReader};
use tokio_util::io::StreamReader;
use manta_shared::shared::dto::CfsSessionGetResponse;
use manta_shared::shared::params::session::GetSessionParams;
use super::{MantaClient, QueryBuilder};
impl MantaClient {
pub async fn get_sessions(
&self,
token: &str,
params: &GetSessionParams,
) -> anyhow::Result<Vec<CfsSessionGetResponse>> {
let q = QueryBuilder::new()
.opt("hsm_group", ¶ms.hsm_group)
.vec("xnames", ¶ms.xnames)
.opt("min_age", ¶ms.min_age)
.opt("max_age", ¶ms.max_age)
.opt("session_type", ¶ms.session_type)
.opt("status", ¶ms.status)
.opt("name", ¶ms.name)
.opt_display("limit", ¶ms.limit)
.build();
self.get_json(token, "/sessions", &q).await
}
#[allow(clippy::too_many_arguments)]
pub async fn create_session(
&self,
token: &str,
cfs_conf_sess_name: Option<&str>,
playbook_yaml_file_name: Option<&str>,
hsm_group: Option<&str>,
repo_names: &[&str],
repo_last_commit_ids: &[&str],
ansible_limit: Option<&str>,
ansible_verbosity: Option<&str>,
ansible_passthrough: Option<&str>,
) -> anyhow::Result<(String, String)> {
let body = serde_json::json!({
"cfs_conf_sess_name": cfs_conf_sess_name,
"playbook_yaml_file_name": playbook_yaml_file_name,
"hsm_group": hsm_group,
"repo_names": repo_names,
"repo_last_commit_ids": repo_last_commit_ids,
"ansible_limit": ansible_limit,
"ansible_verbosity": ansible_verbosity,
"ansible_passthrough": ansible_passthrough,
});
let resp: Value = self.post_json(token, "/sessions", &body).await?;
let session_name = resp["session_name"]
.as_str()
.context("missing session_name in response")?
.to_string();
let config_name = resp["configuration_name"]
.as_str()
.context("missing configuration_name in response")?
.to_string();
Ok((session_name, config_name))
}
pub async fn delete_session(
&self,
token: &str,
name: &str,
dry_run: bool,
) -> anyhow::Result<Value> {
let q = [("dry_run", dry_run.to_string())];
self
.delete_json_with_query(token, &format!("/sessions/{name}"), &q)
.await
}
pub async fn stream_session_logs(
&self,
token: &str,
session_name: &str,
timestamps: bool,
) -> anyhow::Result<impl AsyncBufRead + Send + Unpin> {
let url = format!("{}/sessions/{}/logs", self.base_url(), session_name);
let builder = self
.http_client()
.get(&url)
.bearer_auth(token)
.header("X-Manta-Site", self.site_name())
.query(&[("timestamps", timestamps.to_string())]);
Self::log_request_as_curl(&builder);
let resp = builder
.send()
.await
.context("HTTP GET session logs failed")?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
bail!("GET session logs returned {status}: {body}");
}
let byte_stream = resp.bytes_stream().map_err(std::io::Error::other);
Ok(BufReader::new(StreamReader::new(byte_stream)))
}
}