use crate::error::ConfigError;
use crate::providers::provider::{ConfigProvider, ProviderMetadata, ProviderType};
#[cfg(feature = "encryption")]
use crate::security::{SecureString, SensitivityLevel};
use crate::utils::ssrf::validate_remote_url;
use crate::watcher::TlsConfig;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use failsafe::{
backoff, failure_policy, CircuitBreaker, Config as CircuitBreakerConfig, Error as FailsafeError,
};
use figment::{
providers::Serialized,
value::{Dict, Map},
Figment, Profile,
};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use url::Url;
#[cfg(feature = "encryption")]
#[derive(Clone, Debug)]
pub enum ConsulAclToken {
Token(Arc<SecureString>),
Bearer(Arc<SecureString>),
Identity(Arc<SecureString>),
Agent(Arc<SecureString>),
}
#[cfg(feature = "encryption")]
impl ConsulAclToken {
pub fn token(token: impl Into<String>) -> Self {
ConsulAclToken::Token(Arc::new(SecureString::new(
token.into(),
SensitivityLevel::High,
)))
}
pub fn bearer(token: impl Into<String>) -> Self {
ConsulAclToken::Bearer(Arc::new(SecureString::new(
token.into(),
SensitivityLevel::High,
)))
}
pub fn identity(token: impl Into<String>) -> Self {
ConsulAclToken::Identity(Arc::new(SecureString::new(
token.into(),
SensitivityLevel::High,
)))
}
pub fn agent(token: impl Into<String>) -> Self {
ConsulAclToken::Agent(Arc::new(SecureString::new(
token.into(),
SensitivityLevel::High,
)))
}
fn as_str(&self) -> &str {
match self {
ConsulAclToken::Token(t)
| ConsulAclToken::Bearer(t)
| ConsulAclToken::Identity(t)
| ConsulAclToken::Agent(t) => t.as_str(),
}
}
fn header_name(&self) -> &'static str {
match self {
ConsulAclToken::Token(_) => "X-Consul-Token",
ConsulAclToken::Bearer(_) => "Authorization",
ConsulAclToken::Identity(_) => "X-Consul-Identity",
ConsulAclToken::Agent(_) => "X-Consul-Agent-Token",
}
}
}
#[cfg(not(feature = "encryption"))]
#[derive(Clone, Debug)]
pub enum ConsulAclToken {
Token(String),
Bearer(String),
Identity(String),
Agent(String),
}
#[cfg(not(feature = "encryption"))]
impl ConsulAclToken {
pub fn token(token: impl Into<String>) -> Self {
ConsulAclToken::Token(token.into())
}
pub fn bearer(token: impl Into<String>) -> Self {
ConsulAclToken::Bearer(token.into())
}
pub fn identity(token: impl Into<String>) -> Self {
ConsulAclToken::Identity(token.into())
}
pub fn agent(token: impl Into<String>) -> Self {
ConsulAclToken::Agent(token.into())
}
fn as_str(&self) -> &str {
match self {
ConsulAclToken::Token(t)
| ConsulAclToken::Bearer(t)
| ConsulAclToken::Identity(t)
| ConsulAclToken::Agent(t) => t.as_str(),
}
}
fn header_name(&self) -> &'static str {
match self {
ConsulAclToken::Token(_) => "X-Consul-Token",
ConsulAclToken::Bearer(_) => "Authorization",
ConsulAclToken::Identity(_) => "X-Consul-Identity",
ConsulAclToken::Agent(_) => "X-Consul-Agent-Token",
}
}
}
#[derive(Clone, Default)]
pub struct ConsulAclPolicy {
pub policy_id: Option<String>,
pub role_id: Option<String>,
pub namespace: Option<String>,
pub partition: Option<String>,
pub datacenter: Option<String>,
}
impl ConsulAclPolicy {
pub fn new() -> Self {
Self::default()
}
pub fn with_policy_id(mut self, policy_id: impl Into<String>) -> Self {
self.policy_id = Some(policy_id.into());
self
}
pub fn with_role_id(mut self, role_id: impl Into<String>) -> Self {
self.role_id = Some(role_id.into());
self
}
pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
self.namespace = Some(namespace.into());
self
}
pub fn with_partition(mut self, partition: impl Into<String>) -> Self {
self.partition = Some(partition.into());
self
}
pub fn with_datacenter(mut self, datacenter: impl Into<String>) -> Self {
self.datacenter = Some(datacenter.into());
self
}
}
#[derive(Clone)]
pub struct ConsulConfigProvider {
address: String,
key: String,
token: Option<ConsulAclToken>,
acl_policy: Option<ConsulAclPolicy>,
tls_config: Option<TlsConfig>,
priority: u8,
}
#[derive(serde::Deserialize)]
#[allow(non_snake_case)]
struct ConsulKvPair {
Value: String,
}
impl ConsulConfigProvider {
pub fn new(address: impl Into<String>, key: impl Into<String>) -> Self {
Self {
address: address.into(),
key: key.into(),
token: None,
acl_policy: None,
tls_config: None,
priority: 30,
}
}
pub fn from_address(address: impl Into<String>, key: impl Into<String>) -> Self {
Self::new(address, key)
}
pub fn with_token(mut self, token: impl Into<String>) -> Self {
self.token = Some(ConsulAclToken::token(token));
self
}
#[cfg(feature = "encryption")]
pub fn with_token_secure(mut self, token: Arc<SecureString>) -> Self {
self.token = Some(ConsulAclToken::Token(token));
self
}
pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
self.token = Some(ConsulAclToken::bearer(token));
self
}
pub fn with_identity_token(mut self, token: impl Into<String>) -> Self {
self.token = Some(ConsulAclToken::identity(token));
self
}
pub fn with_agent_token(mut self, token: impl Into<String>) -> Self {
self.token = Some(ConsulAclToken::agent(token));
self
}
pub fn with_acl_policy(mut self, policy: ConsulAclPolicy) -> Self {
self.acl_policy = Some(policy);
self
}
pub fn with_tls(
mut self,
ca_cert: impl Into<PathBuf>,
client_cert: Option<impl Into<PathBuf>>,
client_key: Option<impl Into<PathBuf>>,
) -> Self {
let ca_cert: PathBuf = ca_cert.into();
let mut tls = TlsConfig::new().with_ca_cert(&ca_cert);
if let Some(cert) = client_cert {
tls = tls.with_client_cert(cert);
}
if let Some(key) = client_key {
tls = tls.with_client_key(key);
}
self.tls_config = Some(tls);
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority;
self
}
pub fn with_auth(self, _username: impl Into<String>, _password: impl Into<String>) -> Self {
self
}
fn build_client(&self) -> Result<reqwest::blocking::Client, ConfigError> {
let mut client_builder = reqwest::blocking::Client::builder();
if let Some(tls) = &self.tls_config {
if let Some(ca_path) = tls.ca_cert_path() {
let cert_data = std::fs::read(ca_path).map_err(|e| {
ConfigError::RemoteError(format!("Failed to read CA cert: {}", e))
})?;
let cert = reqwest::Certificate::from_pem(&cert_data).map_err(|e| {
ConfigError::RemoteError(format!("Failed to parse CA cert: {}", e))
})?;
client_builder = client_builder.add_root_certificate(cert);
}
if let (Some(cert_path), Some(key_path)) =
(tls.client_cert_path(), tls.client_key_path())
{
let cert_data = std::fs::read(cert_path).map_err(|e| {
ConfigError::RemoteError(format!("Failed to read client cert: {}", e))
})?;
let key_data = std::fs::read(key_path).map_err(|e| {
ConfigError::RemoteError(format!("Failed to read client key: {}", e))
})?;
let mut combined = cert_data;
combined.extend_from_slice(b"\n");
combined.extend_from_slice(&key_data);
let identity = reqwest::Identity::from_pem(&combined).map_err(|e| {
ConfigError::RemoteError(format!("Failed to parse client identity: {}", e))
})?;
client_builder = client_builder.identity(identity);
}
}
client_builder
.build()
.map_err(|e| ConfigError::RemoteError(format!("Failed to build client: {}", e)))
}
fn build_url(&self) -> Result<Url, ConfigError> {
let mut url = Url::parse(&self.address)
.map_err(|e| ConfigError::RemoteError(format!("Invalid Consul URL: {}", e)))?;
let path = url.path();
let key = &self.key;
let base_path = if path == "/" || path.is_empty() {
format!("/v1/kv/{}", key)
} else if path.ends_with("/v1/kv/") {
format!("{}{}", path, key)
} else if path.contains("/v1/kv") {
format!("{}/{}", path.trim_end_matches('/'), key)
} else {
format!("{}/v1/kv/{}", path.trim_end_matches('/'), key)
};
url.set_path(&base_path);
let mut query_pairs: Vec<(String, String)> = Vec::new();
if let Some(policy) = &self.acl_policy {
if let Some(ns) = &policy.namespace {
query_pairs.push(("ns".to_string(), ns.clone()));
}
if let Some(partition) = &policy.partition {
query_pairs.push(("partition".to_string(), partition.clone()));
}
if let Some(dc) = &policy.datacenter {
query_pairs.push(("dc".to_string(), dc.clone()));
}
if let Some(pid) = &policy.policy_id {
query_pairs.push(("policy".to_string(), pid.clone()));
}
if let Some(rid) = &policy.role_id {
query_pairs.push(("role".to_string(), rid.clone()));
}
}
if !query_pairs.is_empty() {
let query: String = query_pairs
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join("&");
url.set_query(Some(&query));
}
Ok(url)
}
fn fetch_data(&self) -> Result<Map<Profile, Dict>, ConfigError> {
let url = self.build_url()?;
let client = self.build_client()?;
let mut req = client.get(url);
if let Some(token) = &self.token {
req = req.header(token.header_name(), token.as_str());
}
let resp = req
.send()
.map_err(|e| ConfigError::RemoteError(format!("Failed to connect to Consul: {}", e)))?;
if resp.status().is_success() {
let kvs: Vec<ConsulKvPair> = resp.json().map_err(|e| {
ConfigError::RemoteError(format!("Failed to parse Consul response: {}", e))
})?;
if let Some(kv) = kvs.first() {
let val_str = &kv.Value;
let decoded = BASE64.decode(val_str).map_err(|e| {
ConfigError::RemoteError(format!("Base64 decode failed: {}", e))
})?;
let json_str = String::from_utf8(decoded)
.map_err(|e| ConfigError::RemoteError(format!("UTF-8 error: {}", e)))?;
let map: Dict = serde_json::from_str(&json_str).map_err(|e| {
ConfigError::RemoteError(format!("Failed to parse JSON: {}", e))
})?;
let mut profiles = Map::new();
profiles.insert(Profile::Default, map);
Ok(profiles)
} else {
Err(ConfigError::RemoteError(format!(
"Key {} not found in Consul (empty response)",
self.key
)))
}
} else if resp.status() == reqwest::StatusCode::NOT_FOUND {
Err(ConfigError::RemoteError(format!(
"Key {} not found in Consul",
self.key
)))
} else if resp.status() == reqwest::StatusCode::FORBIDDEN {
Err(ConfigError::RemoteError(
"Access denied: ACL token insufficient permissions or invalid".to_string(),
))
} else if resp.status() == reqwest::StatusCode::UNAUTHORIZED {
Err(ConfigError::RemoteError(
"Authentication failed: Invalid or expired ACL token".to_string(),
))
} else {
Err(ConfigError::RemoteError(format!(
"Consul returned error: {}",
resp.status()
)))
}
}
}
impl ConfigProvider for ConsulConfigProvider {
fn load(&self) -> Result<Figment, ConfigError> {
validate_remote_url(&self.address)?;
let circuit_breaker = CircuitBreakerConfig::new()
.failure_policy(failure_policy::consecutive_failures(
3,
backoff::constant(Duration::from_secs(10)),
))
.build();
let result = circuit_breaker.call(|| self.fetch_data());
match result {
Ok(data) => {
let figment = Figment::new().merge(Serialized::from(data, Profile::Default));
Ok(figment)
}
Err(FailsafeError::Inner(e)) => Err(e),
Err(FailsafeError::Rejected) => Err(ConfigError::RemoteError(
"Circuit breaker open: Consul requests rejected".to_string(),
)),
}
}
fn name(&self) -> &str {
"consul"
}
fn is_available(&self) -> bool {
!self.address.is_empty() && self.address.starts_with("http")
}
fn priority(&self) -> u8 {
self.priority
}
fn metadata(&self) -> ProviderMetadata {
ProviderMetadata {
name: self.name().to_string(),
description: format!("Consul provider for key: {}", self.key),
source_type: ProviderType::Remote,
requires_network: true,
supports_watch: false,
priority: self.priority,
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
#[deprecated(since = "0.4.0", note = "Use ConsulConfigProvider instead")]
pub type ConsulProvider = ConsulConfigProvider;