manta-cli 2.0.0-beta.10

Another CLI for ALPS
//! CFS session endpoints: list, create, delete, stream logs.

use anyhow::{Context, bail};
use futures::TryStreamExt;
use serde_json::Value;
use tokio::io::{AsyncBufRead, BufReader};
use tokio_util::io::StreamReader;

use manta_shared::shared::dto::CfsSessionGetResponse;
use manta_shared::shared::params::session::GetSessionParams;

use super::{MantaClient, QueryBuilder};

impl MantaClient {
  pub async fn get_sessions(
    &self,
    token: &str,
    params: &GetSessionParams,
  ) -> anyhow::Result<Vec<CfsSessionGetResponse>> {
    let q = QueryBuilder::new()
      .opt("hsm_group", &params.hsm_group)
      .vec("xnames", &params.xnames)
      .opt("min_age", &params.min_age)
      .opt("max_age", &params.max_age)
      .opt("session_type", &params.session_type)
      .opt("status", &params.status)
      .opt("name", &params.name)
      .opt_display("limit", &params.limit)
      .build();
    self.get_json(token, "/sessions", &q).await
  }

  #[allow(clippy::too_many_arguments)]
  pub async fn create_session(
    &self,
    token: &str,
    cfs_conf_sess_name: Option<&str>,
    playbook_yaml_file_name: Option<&str>,
    hsm_group: Option<&str>,
    repo_names: &[&str],
    repo_last_commit_ids: &[&str],
    ansible_limit: Option<&str>,
    ansible_verbosity: Option<&str>,
    ansible_passthrough: Option<&str>,
  ) -> anyhow::Result<(String, String)> {
    let body = serde_json::json!({
      "cfs_conf_sess_name": cfs_conf_sess_name,
      "playbook_yaml_file_name": playbook_yaml_file_name,
      "hsm_group": hsm_group,
      "repo_names": repo_names,
      "repo_last_commit_ids": repo_last_commit_ids,
      "ansible_limit": ansible_limit,
      "ansible_verbosity": ansible_verbosity,
      "ansible_passthrough": ansible_passthrough,
    });
    let resp: Value = self.post_json(token, "/sessions", &body).await?;
    let session_name = resp["session_name"]
      .as_str()
      .context("missing session_name in response")?
      .to_string();
    let config_name = resp["configuration_name"]
      .as_str()
      .context("missing configuration_name in response")?
      .to_string();
    Ok((session_name, config_name))
  }

  pub async fn delete_session(
    &self,
    token: &str,
    name: &str,
    dry_run: bool,
  ) -> anyhow::Result<Value> {
    let q = [("dry_run", dry_run.to_string())];
    self
      .delete_json_with_query(token, &format!("/sessions/{}", name), &q)
      .await
  }

  /// Stream CFS session logs from `GET /sessions/{name}/logs` (SSE).
  ///
  /// Returns a buffered reader over the SSE byte stream.  The caller is
  /// responsible for stripping the `data: ` prefix that the server wraps
  /// around each log line.
  pub async fn stream_session_logs(
    &self,
    token: &str,
    session_name: &str,
    timestamps: bool,
  ) -> anyhow::Result<impl AsyncBufRead + Send + Unpin> {
    let url = format!("{}/sessions/{}/logs", self.base_url(), session_name);
    let resp = self
      .http_client()
      .get(&url)
      .bearer_auth(token)
      .header("X-Manta-Site", self.site_name())
      .query(&[("timestamps", timestamps.to_string())])
      .send()
      .await
      .context("HTTP GET session logs failed")?;

    if !resp.status().is_success() {
      let status = resp.status();
      let body = resp.text().await.unwrap_or_default();
      bail!("GET session logs returned {}: {}", status, body);
    }

    let byte_stream = resp.bytes_stream().map_err(std::io::Error::other);
    Ok(BufReader::new(StreamReader::new(byte_stream)))
  }
}