#![allow(clippy::result_large_err)]
use crate::commons::RetrySettings;
use crate::error::{
EndpointValidationError,
Error::{ClientErrorResponse, NotFound, ServerErrorResponse},
ErrorDetails,
};
use crate::responses;
use backtrace::Backtrace;
use log::trace;
use reqwest::{StatusCode, blocking::Client as HttpClient, header::HeaderMap, redirect};
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt::Display;
use std::result;
use std::thread::sleep;
use std::time::Duration;
pub type HttpClientResponse = reqwest::blocking::Response;
pub type HttpClientError = crate::error::HttpClientError;
pub type Result<T> = result::Result<T, HttpClientError>;
pub struct ClientBuilder<E = &'static str, U = &'static str, P = &'static str> {
endpoint: E,
username: U,
password: P,
client: Option<HttpClient>,
retry_settings: RetrySettings,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
redirect_policy: Option<redirect::Policy>,
}
impl Default for ClientBuilder {
fn default() -> Self {
Self::new()
}
}
impl ClientBuilder {
pub fn new() -> Self {
Self {
endpoint: "http://localhost:15672/api",
username: "guest",
password: "guest",
client: None,
retry_settings: RetrySettings::default(),
connect_timeout: None,
request_timeout: None,
redirect_policy: None,
}
}
}
impl<E, U, P> ClientBuilder<E, U, P>
where
E: Display,
U: Display,
P: Display,
{
pub fn with_recommended_defaults(self) -> Self {
Self {
request_timeout: Some(Duration::from_secs(60)),
retry_settings: RetrySettings {
max_attempts: 3,
delay_ms: 1000,
},
redirect_policy: Some(redirect::Policy::none()),
..self
}
}
}
impl<E, U, P> ClientBuilder<E, U, P>
where
E: Display,
U: Display,
P: Display,
{
pub fn with_basic_auth_credentials<NewU, NewP>(
self,
username: NewU,
password: NewP,
) -> ClientBuilder<E, NewU, NewP>
where
NewU: Display,
NewP: Display,
{
ClientBuilder {
endpoint: self.endpoint,
username,
password,
client: self.client,
retry_settings: self.retry_settings,
connect_timeout: self.connect_timeout,
request_timeout: self.request_timeout,
redirect_policy: self.redirect_policy,
}
}
pub fn with_endpoint<T>(self, endpoint: T) -> ClientBuilder<T, U, P>
where
T: Display,
{
ClientBuilder {
endpoint,
username: self.username,
password: self.password,
client: self.client,
retry_settings: self.retry_settings,
connect_timeout: self.connect_timeout,
request_timeout: self.request_timeout,
redirect_policy: self.redirect_policy,
}
}
pub fn with_client(self, client: HttpClient) -> Self {
ClientBuilder {
client: Some(client),
..self
}
}
pub fn with_connect_timeout(self, timeout: Duration) -> Self {
ClientBuilder {
connect_timeout: Some(timeout),
..self
}
}
pub fn with_request_timeout(self, timeout: Duration) -> Self {
ClientBuilder {
request_timeout: Some(timeout),
..self
}
}
pub fn with_retry_settings(self, retry_settings: RetrySettings) -> Self {
ClientBuilder {
retry_settings,
..self
}
}
pub fn with_redirect_policy(self, policy: redirect::Policy) -> Self {
ClientBuilder {
redirect_policy: Some(policy),
..self
}
}
pub fn build(self) -> result::Result<Client<E, U, P>, EndpointValidationError> {
let endpoint_str = self.endpoint.to_string();
if !endpoint_str.starts_with("http://") && !endpoint_str.starts_with("https://") {
return Err(EndpointValidationError::UnsupportedScheme {
endpoint: endpoint_str,
});
}
let client = match self.client {
Some(c) => c,
None => {
let mut builder = HttpClient::builder();
if let Some(timeout) = self.connect_timeout {
builder = builder.connect_timeout(timeout);
}
if let Some(timeout) = self.request_timeout {
builder = builder.timeout(timeout);
}
if let Some(policy) = self.redirect_policy {
builder = builder.redirect(policy);
}
builder
.build()
.map_err(|e| EndpointValidationError::ClientBuildError { source: e })?
}
};
Ok(Client::from_http_client_with_retry(
client,
self.endpoint,
self.username,
self.password,
self.retry_settings,
))
}
}
pub struct Client<E, U, P> {
endpoint: E,
username: U,
password: P,
client: HttpClient,
retry_settings: RetrySettings,
}
impl<E, U, P> Client<E, U, P>
where
E: Display,
U: Display,
P: Display,
{
pub fn new(endpoint: E, username: U, password: P) -> Self {
let client = HttpClient::builder().build().unwrap();
trace!(
"Created new blocking RabbitMQ HTTP API client for endpoint: {}",
endpoint
);
Self {
endpoint,
username,
password,
client,
retry_settings: RetrySettings::default(),
}
}
pub fn from_http_client(client: HttpClient, endpoint: E, username: U, password: P) -> Self {
Self {
endpoint,
username,
password,
client,
retry_settings: RetrySettings::default(),
}
}
pub fn from_http_client_with_retry(
client: HttpClient,
endpoint: E,
username: U,
password: P,
retry_settings: RetrySettings,
) -> Self {
Self {
endpoint,
username,
password,
client,
retry_settings,
}
}
pub fn builder() -> ClientBuilder<&'static str, &'static str, &'static str> {
ClientBuilder::new()
}
fn with_retry<F>(&self, operation: F) -> Result<HttpClientResponse>
where
F: Fn() -> Result<HttpClientResponse>,
{
let mut last_error = None;
let n = self.retry_settings.max_attempts;
for attempt in 0..=n {
match operation() {
Ok(response) => {
if attempt > 0 {
trace!("Request succeeded after {} retry attempt(s)", attempt);
}
return Ok(response);
}
Err(e) => {
if attempt < n {
trace!(
"Request failed on attempt {}/{}, retrying in {}ms: {}",
attempt + 1,
n + 1,
self.retry_settings.delay_ms,
e
);
} else {
trace!("Request failed after {} attempt(s): {}", n + 1, e);
}
last_error = Some(e);
if attempt < n {
sleep(Duration::from_millis(self.retry_settings.delay_ms));
}
}
}
}
Err(last_error.unwrap())
}
pub(crate) fn get_api_request<T, S>(&self, path: S) -> Result<T>
where
T: DeserializeOwned,
S: AsRef<str>,
{
let response = self.http_get(path, None, None)?;
let response = response.json()?;
Ok(response)
}
pub(crate) fn get_paginated_api_request<T, S>(&self, path: S, query: &str) -> Result<Vec<T>>
where
T: DeserializeOwned,
S: AsRef<str>,
{
let path_with_query = format!("{}?{}", path.as_ref(), query);
let response = self.http_get(path_with_query, None, None)?;
let response: responses::PaginatedResponse<T> = response.json()?;
Ok(response.items)
}
pub(crate) fn delete_api_request_with_optional_not_found<S>(
&self,
path: S,
idempotent: bool,
) -> Result<()>
where
S: AsRef<str>,
{
let excludes = if idempotent {
Some(StatusCode::NOT_FOUND)
} else {
None
};
self.http_delete(path, excludes, None)?;
Ok(())
}
pub(crate) fn put_api_request<S, T>(&self, path: S, payload: &T) -> Result<()>
where
S: AsRef<str>,
T: Serialize,
{
self.http_put(path, payload, None, None)?;
Ok(())
}
pub(crate) fn http_get<S>(
&self,
path: S,
client_code_to_accept_or_ignore: Option<StatusCode>,
server_code_to_accept_or_ignore: Option<StatusCode>,
) -> Result<HttpClientResponse>
where
S: AsRef<str>,
{
let rooted_path = self.rooted_path(path);
let username = self.username.to_string();
let password = self.password.to_string();
trace!("HTTP GET: {}", rooted_path);
self.with_retry(|| {
let response = self
.client
.get(&rooted_path)
.basic_auth(&username, Some(&password))
.send()?;
self.ok_or_status_code_error(
response,
client_code_to_accept_or_ignore,
server_code_to_accept_or_ignore,
)
})
}
pub(crate) fn http_put<S, T>(
&self,
path: S,
payload: &T,
client_code_to_accept_or_ignore: Option<StatusCode>,
server_code_to_accept_or_ignore: Option<StatusCode>,
) -> Result<HttpClientResponse>
where
S: AsRef<str>,
T: Serialize,
{
let rooted_path = self.rooted_path(path);
let username = self.username.to_string();
let password = self.password.to_string();
if let Ok(body) = serde_json::to_string_pretty(payload) {
trace!("HTTP PUT: {}\nRequest body:\n{}", rooted_path, body);
} else {
trace!("HTTP PUT: {}", rooted_path);
}
self.with_retry(|| {
let response = self
.client
.put(&rooted_path)
.json(payload)
.basic_auth(&username, Some(&password))
.send()?;
self.ok_or_status_code_error(
response,
client_code_to_accept_or_ignore,
server_code_to_accept_or_ignore,
)
})
}
pub(crate) fn http_post<S, T>(
&self,
path: S,
payload: &T,
client_code_to_accept_or_ignore: Option<StatusCode>,
server_code_to_accept_or_ignore: Option<StatusCode>,
) -> Result<HttpClientResponse>
where
S: AsRef<str>,
T: Serialize,
{
let rooted_path = self.rooted_path(path);
let username = self.username.to_string();
let password = self.password.to_string();
if let Ok(body) = serde_json::to_string_pretty(payload) {
trace!("HTTP POST: {}\nRequest body:\n{}", rooted_path, body);
} else {
trace!("HTTP POST: {}", rooted_path);
}
self.with_retry(|| {
let response = self
.client
.post(&rooted_path)
.json(payload)
.basic_auth(&username, Some(&password))
.send()?;
self.ok_or_status_code_error(
response,
client_code_to_accept_or_ignore,
server_code_to_accept_or_ignore,
)
})
}
pub(crate) fn http_post_without_body<S>(
&self,
path: S,
client_code_to_accept_or_ignore: Option<StatusCode>,
server_code_to_accept_or_ignore: Option<StatusCode>,
) -> Result<HttpClientResponse>
where
S: AsRef<str>,
{
let rooted_path = self.rooted_path(path);
let username = self.username.to_string();
let password = self.password.to_string();
self.with_retry(|| {
let response = self
.client
.post(&rooted_path)
.basic_auth(&username, Some(&password))
.send()?;
self.ok_or_status_code_error(
response,
client_code_to_accept_or_ignore,
server_code_to_accept_or_ignore,
)
})
}
pub(crate) fn http_delete<S>(
&self,
path: S,
client_code_to_accept_or_ignore: Option<StatusCode>,
server_code_to_accept_or_ignore: Option<StatusCode>,
) -> Result<HttpClientResponse>
where
S: AsRef<str>,
{
let rooted_path = self.rooted_path(path);
let username = self.username.to_string();
let password = self.password.to_string();
trace!("HTTP DELETE: {}", rooted_path);
self.with_retry(|| {
let response = self
.client
.delete(&rooted_path)
.basic_auth(&username, Some(&password))
.send()?;
self.ok_or_status_code_error(
response,
client_code_to_accept_or_ignore,
server_code_to_accept_or_ignore,
)
})
}
pub(crate) fn http_delete_with_headers<S>(
&self,
path: S,
headers: HeaderMap,
client_code_to_accept_or_ignore: Option<StatusCode>,
server_code_to_accept_or_ignore: Option<StatusCode>,
) -> Result<HttpClientResponse>
where
S: AsRef<str>,
{
let rooted_path = self.rooted_path(path);
let username = self.username.to_string();
let password = self.password.to_string();
self.with_retry(|| {
let response = self
.client
.delete(&rooted_path)
.basic_auth(&username, Some(&password))
.headers(headers.clone())
.send()?;
self.ok_or_status_code_error(
response,
client_code_to_accept_or_ignore,
server_code_to_accept_or_ignore,
)
})
}
pub(crate) fn ok_or_status_code_error(
&self,
response: HttpClientResponse,
client_code_to_accept_or_ignore: Option<StatusCode>,
server_code_to_accept_or_ignore: Option<StatusCode>,
) -> Result<HttpClientResponse> {
let status = response.status();
match client_code_to_accept_or_ignore {
Some(status_code) if status_code == StatusCode::NOT_FOUND => {}
_ => {
if status == StatusCode::NOT_FOUND {
trace!("Resource not found (404) at {}", response.url());
return Err(NotFound);
}
}
}
if status.is_client_error() {
match client_code_to_accept_or_ignore {
Some(expect) if status == expect => {}
_ => {
let url = response.url().clone();
let headers = response.headers().clone();
let body = response.text()?;
trace!("Client error response: {} from {}: {}", status, url, body);
let error_details = ErrorDetails::from_json(&body);
return Err(ClientErrorResponse {
url: Some(url),
body: Some(body),
error_details,
headers: Some(headers),
status_code: status,
backtrace: Backtrace::new(),
});
}
}
}
if status.is_server_error() {
match server_code_to_accept_or_ignore {
Some(expect) if status == expect => {}
_ => {
let url = response.url().clone();
let headers = response.headers().clone();
let body = response.text()?;
trace!("Server error response: {} from {}: {}", status, url, body);
let error_details = ErrorDetails::from_json(&body);
return Err(ServerErrorResponse {
url: Some(url),
body: Some(body),
error_details,
headers: Some(headers),
status_code: status,
backtrace: Backtrace::new(),
});
}
}
}
Ok(response)
}
pub(crate) fn rooted_path<S>(&self, path: S) -> String
where
S: AsRef<str>,
{
format!("{}/{}", self.endpoint, path.as_ref())
}
}
impl Default for Client<&'static str, &'static str, &'static str> {
fn default() -> Self {
Self::new("http://localhost:15672/api", "guest", "guest")
}
}