#![allow(clippy::manual_try_fold)]
use std::env;
use std::fs::File;
use std::io;
use std::str::FromStr;
use std::thread;
use std::time;
use async_recursion::async_recursion;
use async_trait::async_trait;
use diqwest::DigestAuthSession;
use diqwest::WithDigestAuth;
use indicatif::ProgressBar;
use reqwest::Client;
use reqwest::StatusCode;
use crate::error::Error;
use crate::model::Clusters;
use crate::model::JobId;
use crate::model::JobState;
use crate::model::JobStatus;
use crate::model::LogCollectionJob;
use crate::model::Shard;
use crate::progress::SpinnerHelper;
const MONGODB_URL: &str = "https://cloud.mongodb.com/api/atlas/v1.0/groups";
#[async_trait]
pub trait FtdcLoader {
async fn get_ftdc_data(
&self,
group_key: &str,
replica_set_name: &str,
byte_size: u64,
public: &str,
private: &str,
) -> Result<String, Error>;
}
pub struct FtdcDataService {
pub client: Client,
base_url: String,
}
impl FtdcDataService {
pub fn new(client: Client) -> Self {
Self { client, base_url: MONGODB_URL.to_string() }
}
#[cfg(test)]
fn with_base_url(client: Client, base_url: String) -> Self {
Self { client, base_url }
}
}
#[async_trait]
impl FtdcLoader for FtdcDataService {
async fn get_ftdc_data(
&self,
group_key: &str,
replica_set_name: &str,
byte_size: u64,
public: &str,
private: &str,
) -> Result<String, Error> {
let session = DigestAuthSession::new(public, private);
let replica_set = self
.get_replica_set(group_key, replica_set_name, &session)
.await?;
let job_id = self
.create_ftdc_job(group_key, &replica_set, byte_size, &session)
.await?
.id;
let check_job_status_spinner =
SpinnerHelper::create(format!("Check job status of job with id: {job_id}"));
let _download_url = self
.check_job_state(group_key, &job_id, &check_job_status_spinner?, &session)
.await?;
let download_ftdc_data_spinner = SpinnerHelper::create(format!(
"Start to download FTDC data for job with id: {job_id}"
));
self.download_ftdc_data(
group_key,
&job_id,
&replica_set,
&download_ftdc_data_spinner?,
&session,
)
.await
}
}
impl FtdcDataService {
async fn get_replica_set(
&self,
group_key: &str,
replica_set_name: &str,
session: &DigestAuthSession,
) -> Result<String, Error> {
let processes = self
.client
.get(format!("{}/{group_key}/processes", self.base_url))
.send_digest_auth(session)
.await?;
match processes.status() {
StatusCode::OK => {
let response_body = processes.text().await?;
let shards = serde_json::from_str::<Clusters>(&response_body)?.results;
let shards: Vec<Shard> = shards
.into_iter()
.filter(|s| {
s.replica_set_name.is_some()
&& (s.user_alias.contains(replica_set_name)
|| s.replica_set_name == Some(replica_set_name.to_string()))
})
.collect();
shards
.first()
.and_then(|s| s.replica_set_name.as_ref())
.iter()
.fold(
Err(Error::ReplicaSetNotFound(format!(
"No replica set found that corresponds to {replica_set_name}"
))),
|_, s| Ok(s.to_string()),
)
}
_ => Err(Error::ReplicaSetNotFound(format!(
"Something went wrong trying to get the list of running processes. Please try later. Currently running processes: {processes}",
processes = processes.text().await?
))),
}
}
async fn create_ftdc_job(
&self,
group_key: &str,
replica_set: &str,
byte_size: u64,
session: &DigestAuthSession,
) -> Result<JobId, Error> {
println!("Starting FTDC data job for ReplicaSet: {replica_set}");
let create_ftdc_job = self
.client
.post(format!("{}/{group_key}/logCollectionJobs", self.base_url))
.header("Content-type", "application/json; charset=utf-8")
.json(&LogCollectionJob::from(replica_set, byte_size))
.send_digest_auth(session)
.await?;
match create_ftdc_job.status() {
StatusCode::CREATED => {
let response_body = create_ftdc_job.text().await?;
Ok(serde_json::from_str::<JobId>(&response_body)?)
}
_ => Err(Error::CreateJob(format!(
"Something went wrong creating the FTDC job: {error}",
error = create_ftdc_job.text().await?
))),
}
}
#[async_recursion]
async fn check_job_state(
&self,
group_key: &str,
job_id: &str,
spinner: &ProgressBar,
session: &DigestAuthSession,
) -> Result<String, Error> {
let check_job_status = self
.client
.get(format!(
"{}/{group_key}/logCollectionJobs/{job_id}",
self.base_url
))
.send_digest_auth(session)
.await?;
match check_job_status.status() {
StatusCode::OK => {
let job_status = check_job_status.text().await?;
let job_status = serde_json::from_str::<JobStatus>(&job_status)?;
match JobState::from_str(job_status.status)? {
JobState::InProgress => {
spinner.set_message(format!("IN_PROGRESS – job id: {job_id}"));
thread::sleep(time::Duration::from_millis(3000));
self.check_job_state(group_key, job_id, spinner, session)
.await
}
JobState::Succcess | JobState::MarkedForExpiry => {
spinner.finish_with_message(format!(
"SUCCESS – FTDC data for job with id {job_id} will be downloaded."
));
Ok(String::from(job_status.download_url))
}
JobState::Failure | JobState::Expired => {
spinner.abandon_with_message(format!(
"FAILURE – Something went wrong creating job with id {job_id}."
));
Err(Error::MongoJob(
"Failure while job creation. Please try again.".to_string(),
))
}
}
}
_ => Err(Error::CheckJobStatus(format!(
"Something went wrong checking the jobs status. Try again later. Error message: {error}",
error = check_job_status.text().await?
))),
}
}
async fn download_ftdc_data(
&self,
group_key: &str,
job_id: &str,
replica_set: &str,
spinner: &ProgressBar,
session: &DigestAuthSession,
) -> Result<String, Error> {
let download_url = format!(
"{}/{group_key}/logCollectionJobs/{job_id}/download",
self.base_url
);
let response = self
.client
.get(&download_url)
.send_digest_auth(session)
.await?;
match response.status() {
StatusCode::OK => {
spinner.set_message(format!(
"PROGRESS – Download FTDC data for job with id: {job_id}"
));
let bytes = response.bytes().await?;
let mut slice: &[u8] = bytes.as_ref();
let file_name = format!("ftdc_data_{replica_set}_job_{job_id}.tar.gz");
let file_name = file_name.as_str();
let mut out = File::create(file_name)?;
io::copy(&mut slice, &mut out)?;
spinner.finish_with_message(format!(
"SUCCESS – FTDC data for job with id {job_id} downloaded."
));
Ok(format!(
"{current_dir}/{file_name}",
current_dir = env::current_dir()?.display()
))
}
_ => Err(Error::Download(format!(
"Something went wrong downloading the FTDC data. Try to download at: {url}. Status code: {status}. Body: {body}",
url = download_url,
status = response.status(),
body = response.text().await?
))),
}
}
}
#[cfg(test)]
mod tests {
use crate::model::Clusters;
use crate::model::JobId;
use crate::model::JobStatus;
use crate::model::Shard;
use crate::service::FtdcDataService;
use diqwest::DigestAuthSession;
use indicatif::ProgressBar;
use mockito::Server;
use reqwest::Client;
fn ftdc_data_service(base_url: String) -> FtdcDataService {
FtdcDataService::with_base_url(Client::new(), base_url)
}
#[tokio::test]
async fn given_explicit_replica_set_name_when_get_replica_set_then_get_the_same_name() {
let mut server = Server::new_async().await;
let clusters = Clusters {
results: vec![Shard {
user_alias: "something that does not matter".to_string(),
type_name: "".to_string(),
replica_set_name: Some("my-replica-set".to_string()),
}],
};
let _m = server
.mock("GET", "/my-group-key/processes")
.with_status(200)
.with_header("content-type", "application/json; charset=utf-8")
.with_body(serde_json::to_string(&clusters).unwrap())
.create_async()
.await;
let session = DigestAuthSession::new("", "");
let response = ftdc_data_service(server.url())
.get_replica_set("my-group-key", "my-replica-set", &session)
.await
.unwrap();
assert_eq!(&response, "my-replica-set");
}
#[tokio::test]
async fn given_shard_name_when_get_replica_set_then_get_corresponding_replica_set() {
let mut server = Server::new_async().await;
let clusters = Clusters {
results: vec![Shard {
user_alias: "my-rs-shard-00".to_string(),
type_name: "".to_string(),
replica_set_name: Some("my-replica-set".to_string()),
}],
};
let _m = server
.mock("GET", "/my-group-key/processes")
.with_status(200)
.with_header("content-type", "application/json; charset=utf-8")
.with_body(serde_json::to_string(&clusters).unwrap())
.create_async()
.await;
let session = DigestAuthSession::new("", "");
let response = ftdc_data_service(server.url())
.get_replica_set("my-group-key", "my-rs-shard-00", &session)
.await
.unwrap();
assert_eq!(&response, "my-replica-set");
}
#[tokio::test]
async fn given_wrong_name_when_get_replica_set_then_no_rs_error() {
let mut server = Server::new_async().await;
let clusters = Clusters {
results: vec![Shard {
user_alias: "my-rs-shard-00".to_string(),
type_name: "".to_string(),
replica_set_name: Some("my-replica-set".to_string()),
}],
};
let _m = server
.mock("GET", "/my-group-key/processes")
.with_status(200)
.with_header("content-type", "application/json; charset=utf-8")
.with_body(serde_json::to_string(&clusters).unwrap())
.create_async()
.await;
let session = DigestAuthSession::new("", "");
let replica_set_not_found_error = ftdc_data_service(server.url())
.get_replica_set("my-group-key", "another-rs-shard-00", &session)
.await
.unwrap_err()
.to_string();
assert_eq!(
replica_set_not_found_error,
"No replica set found that corresponds to another-rs-shard-00"
);
}
#[tokio::test]
async fn given_no_processes_in_the_given_group_when_get_replica_set_then_no_rs_error() {
let mut server = Server::new_async().await;
let clusters = Clusters { results: vec![] };
let _m = server
.mock("GET", "/my-group-key/processes")
.with_status(200)
.with_header("content-type", "application/json; charset=utf-8")
.with_body(serde_json::to_string(&clusters).unwrap())
.create_async()
.await;
let session = DigestAuthSession::new("", "");
let replica_set_not_found_error = ftdc_data_service(server.url())
.get_replica_set("my-group-key", "another-rs-shard-00", &session)
.await
.unwrap_err()
.to_string();
assert_eq!(
replica_set_not_found_error,
"No replica set found that corresponds to another-rs-shard-00"
);
}
#[tokio::test]
async fn given_replica_set_when_create_ftdc_job_then_give_job_id() {
let mut server = Server::new_async().await;
let job_id = JobId { id: String::from("new-job-id-73") };
let _m = server
.mock("POST", "/my-group-key/logCollectionJobs")
.with_status(201)
.with_header("content-type", "application/json; charset=utf-8")
.with_body(serde_json::to_string(&job_id).unwrap())
.create_async()
.await;
let session = DigestAuthSession::new("", "");
let response = ftdc_data_service(server.url())
.create_ftdc_job("my-group-key", "another-rs-shard-00", 10, &session)
.await
.unwrap();
assert_eq!(response, JobId { id: String::from("new-job-id-73") });
}
#[tokio::test]
async fn given_job_id_when_check_job_state_then_give_success() {
let mut server = Server::new_async().await;
let job_id = "new-job-id-73";
let job_status =
JobStatus { id: "any id", download_url: "download from here", status: "SUCCESS" };
let spinner = ProgressBar::new_spinner();
let _m = server
.mock("GET", "/my-group-key/logCollectionJobs/new-job-id-73")
.with_status(200)
.with_header("content-type", "application/json; charset=utf-8")
.with_body(serde_json::to_string(&job_status).unwrap())
.create_async()
.await;
let session = DigestAuthSession::new("", "");
let response = ftdc_data_service(server.url())
.check_job_state("my-group-key", job_id, &spinner, &session)
.await
.unwrap();
assert_eq!(response, String::from("download from here"));
}
}