use crate::{CloudError as RestError, Result};
use reqwest::Client;
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
use serde::Serialize;
use std::sync::Arc;
use tracing::{debug, instrument, trace};
const DEFAULT_USER_AGENT: &str = concat!("redis-cloud/", env!("CARGO_PKG_VERSION"));
#[derive(Debug, Clone)]
pub struct CloudClientBuilder {
api_key: Option<String>,
api_secret: Option<String>,
base_url: String,
timeout: std::time::Duration,
user_agent: String,
}
impl Default for CloudClientBuilder {
fn default() -> Self {
Self {
api_key: None,
api_secret: None,
base_url: "https://api.redislabs.com/v1".to_string(),
timeout: std::time::Duration::from_secs(30),
user_agent: DEFAULT_USER_AGENT.to_string(),
}
}
}
impl CloudClientBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn api_key(mut self, key: impl Into<String>) -> Self {
self.api_key = Some(key.into());
self
}
#[must_use]
pub fn api_secret(mut self, secret: impl Into<String>) -> Self {
self.api_secret = Some(secret.into());
self
}
#[must_use]
pub fn base_url(mut self, url: impl Into<String>) -> Self {
self.base_url = url.into();
self
}
#[must_use]
pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = timeout;
self
}
#[must_use]
pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
self.user_agent = user_agent.into();
self
}
pub fn build(self) -> Result<CloudClient> {
let api_key = self
.api_key
.ok_or_else(|| RestError::ConnectionError("API key is required".to_string()))?;
let api_secret = self
.api_secret
.ok_or_else(|| RestError::ConnectionError("API secret is required".to_string()))?;
let mut default_headers = HeaderMap::new();
default_headers.insert(
USER_AGENT,
HeaderValue::from_str(&self.user_agent)
.map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {e}")))?,
);
let client = Client::builder()
.timeout(self.timeout)
.default_headers(default_headers)
.build()
.map_err(|e| RestError::ConnectionError(e.to_string()))?;
Ok(CloudClient {
api_key,
api_secret,
base_url: self.base_url,
timeout: self.timeout,
client: Arc::new(client),
})
}
}
#[derive(Clone)]
pub struct CloudClient {
pub(crate) api_key: String,
pub(crate) api_secret: String,
pub(crate) base_url: String,
pub(crate) timeout: std::time::Duration,
pub(crate) client: Arc<Client>,
}
impl CloudClient {
#[must_use]
pub fn builder() -> CloudClientBuilder {
CloudClientBuilder::new()
}
#[must_use]
pub fn timeout(&self) -> std::time::Duration {
self.timeout
}
#[must_use]
pub fn account(&self) -> crate::AccountHandler {
crate::AccountHandler::new(self.clone())
}
#[must_use]
pub fn subscriptions(&self) -> crate::SubscriptionHandler {
crate::SubscriptionHandler::new(self.clone())
}
#[must_use]
pub fn databases(&self) -> crate::DatabaseHandler {
crate::DatabaseHandler::new(self.clone())
}
#[must_use]
pub fn fixed_subscriptions(&self) -> crate::FixedSubscriptionHandler {
crate::FixedSubscriptionHandler::new(self.clone())
}
#[must_use]
pub fn fixed_databases(&self) -> crate::FixedDatabaseHandler {
crate::FixedDatabaseHandler::new(self.clone())
}
#[must_use]
pub fn acl(&self) -> crate::AclHandler {
crate::AclHandler::new(self.clone())
}
#[must_use]
pub fn users(&self) -> crate::UserHandler {
crate::UserHandler::new(self.clone())
}
#[must_use]
pub fn tasks(&self) -> crate::TaskHandler {
crate::TaskHandler::new(self.clone())
}
#[must_use]
pub fn cloud_accounts(&self) -> crate::CloudAccountHandler {
crate::CloudAccountHandler::new(self.clone())
}
#[must_use]
pub fn vpc_peering(&self) -> crate::VpcPeeringHandler {
crate::VpcPeeringHandler::new(self.clone())
}
#[must_use]
pub fn transit_gateway(&self) -> crate::TransitGatewayHandler {
crate::TransitGatewayHandler::new(self.clone())
}
#[must_use]
pub fn psc(&self) -> crate::PscHandler {
crate::PscHandler::new(self.clone())
}
#[must_use]
pub fn private_link(&self) -> crate::PrivateLinkHandler {
crate::PrivateLinkHandler::new(self.clone())
}
#[must_use]
pub fn cost_reports(&self) -> crate::CostReportHandler {
crate::CostReportHandler::new(self.clone())
}
fn normalize_url(&self, path: &str) -> String {
let base = self.base_url.trim_end_matches('/');
let path = path.trim_start_matches('/');
format!("{base}/{path}")
}
fn status_to_error(status: reqwest::StatusCode, text: String) -> RestError {
match status.as_u16() {
400 => RestError::BadRequest { message: text },
401 => RestError::AuthenticationFailed { message: text },
403 => RestError::Forbidden { message: text },
404 => RestError::NotFound { message: text },
412 => RestError::PreconditionFailed,
429 => RestError::RateLimited { message: text },
500 => RestError::InternalServerError { message: text },
503 => RestError::ServiceUnavailable { message: text },
_ => RestError::ApiError {
code: status.as_u16(),
message: text,
},
}
}
#[instrument(skip(self), fields(method = "GET"))]
pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
let url = self.normalize_url(path);
debug!("GET {}", url);
let response = self
.client
.get(&url)
.header("x-api-key", &self.api_key)
.header("x-api-secret-key", &self.api_secret)
.send()
.await?;
trace!("Response status: {}", response.status());
self.handle_response(response).await
}
#[instrument(skip(self, body), fields(method = "POST"))]
pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> Result<T> {
let url = self.normalize_url(path);
debug!("POST {}", url);
trace!("Request body: {:?}", serde_json::to_value(body).ok());
let response = self
.client
.post(&url)
.header("x-api-key", &self.api_key)
.header("x-api-secret-key", &self.api_secret)
.json(body)
.send()
.await?;
trace!("Response status: {}", response.status());
self.handle_response(response).await
}
#[instrument(skip(self, body), fields(method = "PUT"))]
pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> Result<T> {
let url = self.normalize_url(path);
debug!("PUT {}", url);
trace!("Request body: {:?}", serde_json::to_value(body).ok());
let response = self
.client
.put(&url)
.header("x-api-key", &self.api_key)
.header("x-api-secret-key", &self.api_secret)
.json(body)
.send()
.await?;
trace!("Response status: {}", response.status());
self.handle_response(response).await
}
#[instrument(skip(self), fields(method = "DELETE"))]
pub async fn delete(&self, path: &str) -> Result<()> {
let url = self.normalize_url(path);
debug!("DELETE {}", url);
let response = self
.client
.delete(&url)
.header("x-api-key", &self.api_key)
.header("x-api-secret-key", &self.api_secret)
.send()
.await?;
trace!("Response status: {}", response.status());
if response.status().is_success() {
Ok(())
} else {
let status = response.status();
let text = response
.text()
.await
.unwrap_or_else(|e| format!("(failed to read response body: {e})"));
Err(Self::status_to_error(status, text))
}
}
#[instrument(skip(self), fields(method = "GET"))]
pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
self.get(path).await
}
#[instrument(skip(self), fields(method = "GET"))]
pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
let url = self.normalize_url(path);
debug!("GET {} (bytes)", url);
let response = self
.client
.get(&url)
.header("x-api-key", &self.api_key)
.header("x-api-secret-key", &self.api_secret)
.send()
.await?;
trace!("Response status: {}", response.status());
let status = response.status();
if status.is_success() {
response
.bytes()
.await
.map(|b| b.to_vec())
.map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))
} else {
let text = response
.text()
.await
.unwrap_or_else(|e| format!("(failed to read response body: {e})"));
Err(Self::status_to_error(status, text))
}
}
#[instrument(skip(self, body), fields(method = "POST"))]
pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
self.post(path, &body).await
}
#[instrument(skip(self, body), fields(method = "PUT"))]
pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
self.put(path, &body).await
}
#[instrument(skip(self, body), fields(method = "PATCH"))]
pub async fn patch_raw(
&self,
path: &str,
body: serde_json::Value,
) -> Result<serde_json::Value> {
let url = self.normalize_url(path);
debug!("PATCH {}", url);
trace!("Request body: {:?}", body);
let response = self
.client
.patch(&url)
.header("x-api-key", &self.api_key)
.header("x-api-secret-key", &self.api_secret)
.json(&body)
.send()
.await?;
trace!("Response status: {}", response.status());
self.handle_response(response).await
}
#[instrument(skip(self), fields(method = "DELETE"))]
pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
let url = self.normalize_url(path);
debug!("DELETE {}", url);
let response = self
.client
.delete(&url)
.header("x-api-key", &self.api_key)
.header("x-api-secret-key", &self.api_secret)
.send()
.await?;
trace!("Response status: {}", response.status());
if response.status().is_success() {
if response.content_length() == Some(0) {
Ok(serde_json::json!({"status": "deleted"}))
} else {
response.json().await.map_err(Into::into)
}
} else {
let status = response.status();
let text = response
.text()
.await
.unwrap_or_else(|e| format!("(failed to read response body: {e})"));
Err(Self::status_to_error(status, text))
}
}
#[instrument(skip(self, body), fields(method = "DELETE"))]
pub async fn delete_with_body<T: serde::de::DeserializeOwned>(
&self,
path: &str,
body: serde_json::Value,
) -> Result<T> {
let url = self.normalize_url(path);
debug!("DELETE {} (with body)", url);
trace!("Request body: {:?}", body);
let response = self
.client
.delete(&url)
.header("x-api-key", &self.api_key)
.header("x-api-secret-key", &self.api_secret)
.json(&body)
.send()
.await?;
trace!("Response status: {}", response.status());
self.handle_response(response).await
}
#[cfg(feature = "tower-integration")]
async fn handle_response_with_status(
&self,
response: reqwest::Response,
) -> Result<(u16, serde_json::Value)> {
let status = response.status();
let status_code = status.as_u16();
if status.is_success() {
let bytes = response
.bytes()
.await
.map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
let value: serde_json::Value = serde_json::from_slice(&bytes).map_err(|e| {
RestError::ConnectionError(format!("Failed to parse JSON response: {e}"))
})?;
Ok((status_code, value))
} else {
let text = response
.text()
.await
.unwrap_or_else(|e| format!("(failed to read response body: {e})"));
Err(Self::status_to_error(status, text))
}
}
async fn handle_response<T: serde::de::DeserializeOwned>(
&self,
response: reqwest::Response,
) -> Result<T> {
let status = response.status();
if status.is_success() {
let bytes = response
.bytes()
.await
.map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
let deserializer = &mut serde_json::Deserializer::from_slice(&bytes);
serde_path_to_error::deserialize(deserializer).map_err(|err| {
let path = err.path().to_string();
RestError::ConnectionError(format!(
"Failed to deserialize field '{}': {}",
path,
err.inner()
))
})
} else {
let text = response
.text()
.await
.unwrap_or_else(|e| format!("(failed to read response body: {e})"));
Err(Self::status_to_error(status, text))
}
}
}
#[cfg(feature = "tower-integration")]
pub mod tower_support {
use super::{CloudClient, RestError, Result};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Method {
Get,
Post,
Put,
Patch,
Delete,
}
#[derive(Debug, Clone)]
pub struct ApiRequest {
pub method: Method,
pub path: String,
pub body: Option<serde_json::Value>,
}
impl ApiRequest {
pub fn get(path: impl Into<String>) -> Self {
Self {
method: Method::Get,
path: path.into(),
body: None,
}
}
pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
Self {
method: Method::Post,
path: path.into(),
body: Some(body),
}
}
pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
Self {
method: Method::Put,
path: path.into(),
body: Some(body),
}
}
pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
Self {
method: Method::Patch,
path: path.into(),
body: Some(body),
}
}
pub fn delete(path: impl Into<String>) -> Self {
Self {
method: Method::Delete,
path: path.into(),
body: None,
}
}
}
#[derive(Debug, Clone)]
pub struct ApiResponse {
pub status: u16,
pub body: serde_json::Value,
}
impl CloudClient {
#[must_use]
pub fn into_service(self) -> Self {
self
}
}
impl Service<ApiRequest> for CloudClient {
type Response = ApiResponse;
type Error = RestError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: ApiRequest) -> Self::Future {
let client = self.clone();
Box::pin(async move {
let url = client.normalize_url(&req.path);
let request_builder = match req.method {
Method::Get => client.client.get(&url),
Method::Post => {
let body = req.body.ok_or_else(|| RestError::BadRequest {
message: "POST request requires a body".to_string(),
})?;
client.client.post(&url).json(&body)
}
Method::Put => {
let body = req.body.ok_or_else(|| RestError::BadRequest {
message: "PUT request requires a body".to_string(),
})?;
client.client.put(&url).json(&body)
}
Method::Patch => {
let body = req.body.ok_or_else(|| RestError::BadRequest {
message: "PATCH request requires a body".to_string(),
})?;
client.client.patch(&url).json(&body)
}
Method::Delete => client.client.delete(&url),
};
let response = request_builder
.header("x-api-key", &client.api_key)
.header("x-api-secret-key", &client.api_secret)
.send()
.await?;
let (status, body) = client.handle_response_with_status(response).await?;
Ok(ApiResponse { status, body })
})
}
}
}