use crate::actions::ActionHandler;
use crate::alerts::AlertHandler;
use crate::bdb::BdbHandler;
use crate::bdb_groups::BdbGroupsHandler;
use crate::bootstrap::BootstrapHandler;
use crate::cluster::ClusterHandler;
use crate::cm_settings::CmSettingsHandler;
use crate::crdb::CrdbHandler;
use crate::crdb_tasks::CrdbTasksHandler;
use crate::debuginfo::DebugInfoHandler;
use crate::diagnostics::DiagnosticsHandler;
use crate::endpoints::EndpointsHandler;
use crate::error::{RestError, Result};
use crate::job_scheduler::JobSchedulerHandler;
use crate::jsonschema::JsonSchemaHandler;
use crate::ldap_mappings::LdapMappingHandler;
use crate::license::LicenseHandler;
use crate::local::LocalHandler;
use crate::logs::LogsHandler;
use crate::migrations::MigrationsHandler;
use crate::modules::ModuleHandler;
use crate::nodes::NodeHandler;
use crate::ocsp::OcspHandler;
use crate::proxies::ProxyHandler;
use crate::redis_acls::RedisAclHandler;
use crate::roles::RolesHandler;
use crate::services::ServicesHandler;
use crate::shards::ShardHandler;
use crate::stats::StatsHandler;
use crate::suffixes::SuffixesHandler;
use crate::usage_report::UsageReportHandler;
use crate::users::UserHandler;
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
use reqwest::{Client, Response};
use serde::{Serialize, de::DeserializeOwned};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, trace};
const DEFAULT_USER_AGENT: &str = concat!("redis-enterprise/", env!("CARGO_PKG_VERSION"));
pub type RestConfig = EnterpriseClientBuilder;
#[derive(Debug, Clone)]
pub struct EnterpriseClientBuilder {
base_url: String,
username: Option<String>,
password: Option<String>,
timeout: Duration,
insecure: bool,
user_agent: String,
ca_cert_path: Option<std::path::PathBuf>,
ca_cert_pem: Option<Vec<u8>>,
}
impl Default for EnterpriseClientBuilder {
fn default() -> Self {
Self {
base_url: "https://localhost:9443".to_string(),
username: None,
password: None,
timeout: Duration::from_secs(30),
insecure: false,
user_agent: DEFAULT_USER_AGENT.to_string(),
ca_cert_path: None,
ca_cert_pem: None,
}
}
}
impl EnterpriseClientBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn base_url(mut self, url: impl Into<String>) -> Self {
self.base_url = url.into();
self
}
#[must_use]
pub fn username(mut self, username: impl Into<String>) -> Self {
self.username = Some(username.into());
self
}
#[must_use]
pub fn password(mut self, password: impl Into<String>) -> Self {
self.password = Some(password.into());
self
}
#[must_use]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
#[must_use]
pub fn insecure(mut self, insecure: bool) -> Self {
self.insecure = insecure;
self
}
#[must_use]
pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
self.user_agent = user_agent.into();
self
}
#[must_use]
pub fn ca_cert(mut self, path: impl Into<std::path::PathBuf>) -> Self {
self.ca_cert_path = Some(path.into());
self
}
#[must_use]
pub fn ca_cert_pem(mut self, pem: impl Into<Vec<u8>>) -> Self {
self.ca_cert_pem = Some(pem.into());
self
}
pub fn build(self) -> Result<EnterpriseClient> {
let username = self.username.unwrap_or_default();
let password = self.password.unwrap_or_default();
let mut default_headers = HeaderMap::new();
default_headers.insert(
USER_AGENT,
HeaderValue::from_str(&self.user_agent)
.map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {}", e)))?,
);
let mut client_builder = Client::builder()
.timeout(self.timeout)
.default_headers(default_headers);
if let Some(ca_cert_path) = &self.ca_cert_path {
let cert_pem = std::fs::read(ca_cert_path).map_err(|e| {
RestError::ConnectionError(format!(
"Failed to read CA certificate from {:?}: {}",
ca_cert_path, e
))
})?;
let cert = reqwest::Certificate::from_pem(&cert_pem).map_err(|e| {
RestError::ConnectionError(format!("Invalid CA certificate: {}", e))
})?;
client_builder = client_builder.tls_certs_merge([cert]);
} else if let Some(ca_cert_pem) = &self.ca_cert_pem {
let cert = reqwest::Certificate::from_pem(ca_cert_pem).map_err(|e| {
RestError::ConnectionError(format!("Invalid CA certificate: {}", e))
})?;
client_builder = client_builder.tls_certs_merge([cert]);
}
if self.insecure {
client_builder = client_builder.tls_danger_accept_invalid_certs(true);
}
let client = client_builder
.build()
.map_err(|e| RestError::ConnectionError(e.to_string()))?;
Ok(EnterpriseClient {
base_url: self.base_url,
username,
password,
timeout: self.timeout,
client: Arc::new(client),
})
}
}
#[derive(Clone)]
pub struct EnterpriseClient {
base_url: String,
username: String,
password: String,
timeout: Duration,
client: Arc<Client>,
}
pub type RestClient = EnterpriseClient;
impl EnterpriseClient {
pub fn builder() -> EnterpriseClientBuilder {
EnterpriseClientBuilder::new()
}
pub fn client(&self) -> Arc<Client> {
self.client.clone()
}
#[must_use]
pub fn timeout(&self) -> Duration {
self.timeout
}
fn normalize_url(&self, path: &str) -> String {
let base = self.base_url.trim_end_matches('/');
let path = path.trim_start_matches('/');
format!("{}/{}", base, path)
}
pub fn from_env() -> Result<Self> {
use std::env;
let base_url = env::var("REDIS_ENTERPRISE_URL")
.unwrap_or_else(|_| "https://localhost:9443".to_string());
let username =
env::var("REDIS_ENTERPRISE_USER").unwrap_or_else(|_| "admin@redis.local".to_string());
let password =
env::var("REDIS_ENTERPRISE_PASSWORD").map_err(|_| RestError::AuthenticationFailed)?;
let insecure = env::var("REDIS_ENTERPRISE_INSECURE")
.unwrap_or_else(|_| "false".to_string())
.parse::<bool>()
.unwrap_or(false);
let ca_cert = env::var("REDIS_ENTERPRISE_CA_CERT").ok();
let mut builder = Self::builder()
.base_url(base_url)
.username(username)
.password(password)
.insecure(insecure);
if let Some(ca_cert_path) = ca_cert {
builder = builder.ca_cert(ca_cert_path);
}
builder.build()
}
pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
let url = self.normalize_url(path);
debug!("GET {}", url);
let response = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
trace!("Response status: {}", response.status());
self.handle_response(response).await
}
pub async fn get_text(&self, path: &str) -> Result<String> {
let url = self.normalize_url(path);
debug!("GET {} (text)", url);
let response = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
trace!("Response status: {}", response.status());
if response.status().is_success() {
let text = response.text().await?;
Ok(text)
} else {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
Err(crate::error::RestError::ApiError {
code: status.as_u16(),
message: error_text,
})
}
}
pub async fn get_binary(&self, path: &str) -> Result<Vec<u8>> {
let url = self.normalize_url(path);
debug!("GET {} (binary)", url);
let response = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
trace!("Response status: {}", response.status());
trace!(
"Response content-type: {:?}",
response.headers().get("content-type")
);
if response.status().is_success() {
let bytes = response.bytes().await?;
Ok(bytes.to_vec())
} else {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
Err(crate::error::RestError::ApiError {
code: status.as_u16(),
message: error_text,
})
}
}
pub async fn post<B: Serialize, T: DeserializeOwned>(&self, path: &str, body: &B) -> Result<T> {
let url = self.normalize_url(path);
debug!("POST {}", url);
trace!("Request body: {:?}", serde_json::to_value(body).ok());
let response = self
.client
.post(&url)
.basic_auth(&self.username, Some(&self.password))
.json(body)
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
trace!("Response status: {}", response.status());
self.handle_response(response).await
}
pub async fn put<B: Serialize, T: DeserializeOwned>(&self, path: &str, body: &B) -> Result<T> {
let url = self.normalize_url(path);
debug!("PUT {}", url);
trace!("Request body: {:?}", serde_json::to_value(body).ok());
let response = self
.client
.put(&url)
.basic_auth(&self.username, Some(&self.password))
.json(body)
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
trace!("Response status: {}", response.status());
self.handle_response(response).await
}
pub async fn delete(&self, path: &str) -> Result<()> {
let url = self.normalize_url(path);
debug!("DELETE {}", url);
let response = self
.client
.delete(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
trace!("Response status: {}", response.status());
if response.status().is_success() {
Ok(())
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
Err(RestError::ApiError {
code: status.as_u16(),
message: text,
})
}
}
pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
self.get(path).await
}
pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
self.post(path, &body).await
}
pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
self.put(path, &body).await
}
pub async fn post_action<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
let url = self.normalize_url(path);
debug!("POST {}", url);
trace!("Request body: {:?}", serde_json::to_value(body).ok());
let response = self
.client
.post(&url)
.basic_auth(&self.username, Some(&self.password))
.json(body)
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
trace!("Response status: {}", response.status());
if response.status().is_success() {
Ok(())
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
Err(RestError::ApiError {
code: status.as_u16(),
message: text,
})
}
}
pub async fn put_action<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
let url = self.normalize_url(path);
debug!("PUT {}", url);
trace!("Request body: {:?}", serde_json::to_value(body).ok());
let response = self
.client
.put(&url)
.basic_auth(&self.username, Some(&self.password))
.json(body)
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
trace!("Response status: {}", response.status());
if response.status().is_success() {
Ok(())
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
Err(RestError::ApiError {
code: status.as_u16(),
message: text,
})
}
}
pub async fn post_multipart<T: DeserializeOwned>(
&self,
path: &str,
file_data: Vec<u8>,
field_name: &str,
file_name: &str,
) -> Result<T> {
let url = self.normalize_url(path);
debug!("POST {} (multipart)", url);
let part = reqwest::multipart::Part::bytes(file_data).file_name(file_name.to_string());
let form = reqwest::multipart::Form::new().part(field_name.to_string(), part);
let response = self
.client
.post(&url)
.basic_auth(&self.username, Some(&self.password))
.multipart(form)
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
trace!("Response status: {}", response.status());
self.handle_response(response).await
}
pub fn rest_client(&self) -> Self {
self.clone()
}
pub async fn post_bootstrap<B: Serialize>(
&self,
path: &str,
body: &B,
) -> Result<serde_json::Value> {
let url = self.normalize_url(path);
let response = self
.client
.post(&url)
.basic_auth(&self.username, Some(&self.password))
.json(body)
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
let status = response.status();
if status.is_success() {
let text = response.text().await.unwrap_or_default();
if text.is_empty() || text.trim().is_empty() {
Ok(serde_json::json!({"status": "success"}))
} else {
Ok(serde_json::from_str(&text)
.unwrap_or_else(|_| serde_json::json!({"status": "success", "response": text})))
}
} else {
let text = response.text().await.unwrap_or_default();
Err(RestError::ApiError {
code: status.as_u16(),
message: text,
})
}
}
pub async fn patch_raw(
&self,
path: &str,
body: serde_json::Value,
) -> Result<serde_json::Value> {
let url = self.normalize_url(path);
let response = self
.client
.patch(&url)
.basic_auth(&self.username, Some(&self.password))
.json(&body)
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
if response.status().is_success() {
response
.json()
.await
.map_err(|e| RestError::ParseError(e.to_string()))
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
Err(RestError::ApiError {
code: status.as_u16(),
message: text,
})
}
}
pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
let url = self.normalize_url(path);
let response = self
.client
.delete(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
if response.status().is_success() {
if response.content_length() == Some(0) {
Ok(serde_json::json!({"status": "deleted"}))
} else {
response
.json()
.await
.map_err(|e| RestError::ParseError(e.to_string()))
}
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
Err(RestError::ApiError {
code: status.as_u16(),
message: text,
})
}
}
fn is_tls_error(error: &reqwest::Error) -> bool {
use std::error::Error;
let mut source = error.source();
while let Some(err) = source {
let msg = err.to_string().to_lowercase();
if msg.contains("certificate")
|| msg.contains("cert")
|| msg.contains("invalidcertificate")
|| msg.contains("handshake")
{
return true;
}
source = err.source();
}
false
}
fn map_reqwest_error(&self, error: reqwest::Error, url: &str) -> RestError {
if Self::is_tls_error(&error) {
return RestError::TlsError(format!(
"TLS certificate verification failed for {}. The server may be using a \
self-signed certificate. To proceed, set `insecure = true` in your profile \
or provide a CA certificate via `ca_cert`.",
url
));
}
if error.is_connect() {
RestError::ConnectionError(format!(
"Failed to connect to {}: Connection refused or host unreachable. Check if the Redis Enterprise server is running and accessible.",
url
))
} else if error.is_timeout() {
RestError::Timeout
} else if error.is_decode() {
RestError::ConnectionError(format!(
"Failed to decode JSON response from {}: {}. Server may have returned invalid JSON or HTML error page.",
url, error
))
} else if let Some(status) = error.status() {
RestError::ApiError {
code: status.as_u16(),
message: format!("HTTP {} from {}: {}", status.as_u16(), url, error),
}
} else if error.is_request() {
RestError::ConnectionError(format!(
"Request to {} failed: {}. Check URL format and network settings.",
url, error
))
} else {
RestError::RequestFailed(error.to_string())
}
}
async fn handle_response<T: DeserializeOwned>(&self, response: Response) -> Result<T> {
if response.status().is_success() {
let bytes = response.bytes().await.map_err(Into::<RestError>::into)?;
let deserializer = &mut serde_json::Deserializer::from_slice(&bytes);
serde_path_to_error::deserialize(deserializer).map_err(|err| {
let path = err.path().to_string();
RestError::ParseError(format!(
"Failed to deserialize field '{}': {}",
path,
err.inner()
))
})
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
match status.as_u16() {
401 => Err(RestError::Unauthorized),
404 => Err(RestError::NotFound),
409 => Err(RestError::Conflict(text)),
429 => Err(RestError::RateLimited { retry_after: None }),
503 => Err(RestError::ClusterBusy),
500..=599 => Err(RestError::ServerError(text)),
_ => Err(RestError::ApiError {
code: status.as_u16(),
message: text,
}),
}
}
}
pub async fn execute_command(&self, db_uid: u32, command: &str) -> Result<serde_json::Value> {
let url = self.normalize_url(&format!("/v1/bdbs/{}/command", db_uid));
let body = serde_json::json!({
"command": command
});
debug!("Executing command on database {}: {}", db_uid, command);
let response = self
.client
.post(&url)
.basic_auth(&self.username, Some(&self.password))
.json(&body)
.send()
.await
.map_err(|e| self.map_reqwest_error(e, &url))?;
self.handle_response(response).await
}
#[must_use]
pub fn actions(&self) -> ActionHandler {
ActionHandler::new(self.clone())
}
#[must_use]
pub fn alerts(&self) -> AlertHandler {
AlertHandler::new(self.clone())
}
#[must_use]
pub fn databases(&self) -> BdbHandler {
BdbHandler::new(self.clone())
}
#[must_use]
pub fn database_groups(&self) -> BdbGroupsHandler {
BdbGroupsHandler::new(self.clone())
}
#[must_use]
pub fn bootstrap(&self) -> BootstrapHandler {
BootstrapHandler::new(self.clone())
}
#[must_use]
pub fn cluster(&self) -> ClusterHandler {
ClusterHandler::new(self.clone())
}
#[must_use]
pub fn cm_settings(&self) -> CmSettingsHandler {
CmSettingsHandler::new(self.clone())
}
#[must_use]
pub fn crdb(&self) -> CrdbHandler {
CrdbHandler::new(self.clone())
}
#[must_use]
pub fn crdb_tasks(&self) -> CrdbTasksHandler {
CrdbTasksHandler::new(self.clone())
}
#[must_use]
pub fn debug_info(&self) -> DebugInfoHandler {
DebugInfoHandler::new(self.clone())
}
#[must_use]
pub fn diagnostics(&self) -> DiagnosticsHandler {
DiagnosticsHandler::new(self.clone())
}
#[must_use]
pub fn endpoints(&self) -> EndpointsHandler {
EndpointsHandler::new(self.clone())
}
#[must_use]
pub fn job_scheduler(&self) -> JobSchedulerHandler {
JobSchedulerHandler::new(self.clone())
}
#[must_use]
pub fn json_schema(&self) -> JsonSchemaHandler {
JsonSchemaHandler::new(self.clone())
}
#[must_use]
pub fn ldap_mappings(&self) -> LdapMappingHandler {
LdapMappingHandler::new(self.clone())
}
#[must_use]
pub fn license(&self) -> LicenseHandler {
LicenseHandler::new(self.clone())
}
#[must_use]
pub fn local(&self) -> LocalHandler {
LocalHandler::new(self.clone())
}
#[must_use]
pub fn logs(&self) -> LogsHandler {
LogsHandler::new(self.clone())
}
#[must_use]
pub fn migrations(&self) -> MigrationsHandler {
MigrationsHandler::new(self.clone())
}
#[must_use]
pub fn modules(&self) -> ModuleHandler {
ModuleHandler::new(self.clone())
}
#[must_use]
pub fn nodes(&self) -> NodeHandler {
NodeHandler::new(self.clone())
}
#[must_use]
pub fn ocsp(&self) -> OcspHandler {
OcspHandler::new(self.clone())
}
#[must_use]
pub fn proxies(&self) -> ProxyHandler {
ProxyHandler::new(self.clone())
}
#[must_use]
pub fn redis_acls(&self) -> RedisAclHandler {
RedisAclHandler::new(self.clone())
}
#[must_use]
pub fn roles(&self) -> RolesHandler {
RolesHandler::new(self.clone())
}
#[must_use]
pub fn services(&self) -> ServicesHandler {
ServicesHandler::new(self.clone())
}
#[must_use]
pub fn shards(&self) -> ShardHandler {
ShardHandler::new(self.clone())
}
#[must_use]
pub fn stats(&self) -> StatsHandler {
StatsHandler::new(self.clone())
}
#[must_use]
pub fn suffixes(&self) -> SuffixesHandler {
SuffixesHandler::new(self.clone())
}
#[must_use]
pub fn usage_reports(&self) -> UsageReportHandler {
UsageReportHandler::new(self.clone())
}
#[must_use]
pub fn users(&self) -> UserHandler {
UserHandler::new(self.clone())
}
}
#[cfg(feature = "tower-integration")]
pub mod tower_support {
use super::*;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Method {
Get,
Post,
Put,
Patch,
Delete,
}
#[derive(Debug, Clone)]
pub struct ApiRequest {
pub method: Method,
pub path: String,
pub body: Option<serde_json::Value>,
}
impl ApiRequest {
pub fn get(path: impl Into<String>) -> Self {
Self {
method: Method::Get,
path: path.into(),
body: None,
}
}
pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
Self {
method: Method::Post,
path: path.into(),
body: Some(body),
}
}
pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
Self {
method: Method::Put,
path: path.into(),
body: Some(body),
}
}
pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
Self {
method: Method::Patch,
path: path.into(),
body: Some(body),
}
}
pub fn delete(path: impl Into<String>) -> Self {
Self {
method: Method::Delete,
path: path.into(),
body: None,
}
}
}
#[derive(Debug, Clone)]
pub struct ApiResponse {
pub status: u16,
pub body: serde_json::Value,
}
impl EnterpriseClient {
pub fn into_service(self) -> Self {
self
}
}
impl Service<ApiRequest> for EnterpriseClient {
type Response = ApiResponse;
type Error = RestError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: ApiRequest) -> Self::Future {
let client = self.clone();
Box::pin(async move {
let response: serde_json::Value = match req.method {
Method::Get => client.get(&req.path).await?,
Method::Post => {
let body = req.body.ok_or_else(|| {
RestError::ValidationError("POST request requires a body".to_string())
})?;
client.post(&req.path, &body).await?
}
Method::Put => {
let body = req.body.ok_or_else(|| {
RestError::ValidationError("PUT request requires a body".to_string())
})?;
client.put(&req.path, &body).await?
}
Method::Patch => {
let body = req.body.ok_or_else(|| {
RestError::ValidationError("PATCH request requires a body".to_string())
})?;
client.patch_raw(&req.path, body).await?
}
Method::Delete => {
client.delete(&req.path).await?;
serde_json::json!({"status": "deleted"})
}
};
Ok(ApiResponse {
status: 200,
body: response,
})
})
}
}
}