use crate::domain::{ConfigError, ConfigKey, ConfigValue, Result};
use crate::ports::ConfigSource;
use etcd_client::{Client, GetOptions};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
static RELOAD_RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Runtime::new().expect("Failed to create reload runtime for etcd adapter")
});
pub struct EtcdAdapter {
client: Arc<Client>,
endpoints: Vec<String>,
prefix: Option<String>,
priority: u8,
cache: HashMap<String, String>,
}
impl fmt::Debug for EtcdAdapter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EtcdAdapter")
.field("client", &"<etcd::Client>")
.field("endpoints", &self.endpoints)
.field("prefix", &self.prefix)
.field("priority", &self.priority)
.field("cache", &self.cache)
.finish()
}
}
impl EtcdAdapter {
fn validate_prefix(prefix: &str) -> Result<()> {
if prefix.contains(['\0', '\n', '\r']) {
return Err(ConfigError::SourceError {
source_name: "etcd".to_string(),
message: "Prefix contains invalid characters".to_string(),
source: None,
});
}
Ok(())
}
pub async fn new<S: AsRef<str>>(endpoints: Vec<S>, prefix: Option<&str>) -> Result<Self> {
if let Some(p) = prefix {
Self::validate_prefix(p)?;
}
let endpoints: Vec<String> = endpoints.iter().map(|s| s.as_ref().to_string()).collect();
let client =
Client::connect(&endpoints, None)
.await
.map_err(|e| ConfigError::SourceError {
source_name: "etcd".to_string(),
message: format!("Failed to connect to etcd: {}", e),
source: Some(Box::new(e)),
})?;
let mut adapter = Self {
client: Arc::new(client),
endpoints: endpoints.clone(),
prefix: prefix.map(|s| s.to_string()),
priority: 1,
cache: HashMap::new(),
};
adapter.load_all_keys().await?;
Ok(adapter)
}
pub async fn with_priority<S: AsRef<str>>(
endpoints: Vec<S>,
prefix: Option<&str>,
priority: u8,
) -> Result<Self> {
let mut adapter = Self::new(endpoints, prefix).await?;
adapter.priority = priority;
Ok(adapter)
}
async fn load_all_keys(&mut self) -> Result<()> {
let prefix = self.prefix.as_deref().unwrap_or("");
let mut client = (*self.client).clone();
let options = GetOptions::new().with_prefix();
let response =
client
.get(prefix, Some(options))
.await
.map_err(|e| ConfigError::SourceError {
source_name: "etcd".to_string(),
message: format!("Failed to fetch keys from etcd: {}", e),
source: Some(Box::new(e)),
})?;
self.cache.clear();
for kv in response.kvs() {
if let (Ok(key), Ok(value)) = (kv.key_str(), kv.value_str()) {
let key = if !prefix.is_empty() && key.starts_with(prefix) {
&key[prefix.len()..]
} else {
key
};
let key = key.replace('/', ".");
self.cache.insert(key, value.to_string());
}
}
Ok(())
}
fn reload_sync(&mut self) -> Result<()> {
let endpoints = self.endpoints.clone();
let prefix = self.prefix.clone();
let new_cache = if tokio::runtime::Handle::try_current().is_ok() {
let handle = std::thread::spawn(move || {
RELOAD_RUNTIME.block_on(async move {
let prefix_str = prefix.as_deref().unwrap_or("");
let mut client = Client::connect(&endpoints, None).await.map_err(|e| {
ConfigError::SourceError {
source_name: "etcd".to_string(),
message: format!("Failed to connect to etcd: {}", e),
source: Some(Box::new(e)),
}
})?;
let options = GetOptions::new().with_prefix();
let response = client.get(prefix_str, Some(options)).await.map_err(|e| {
ConfigError::SourceError {
source_name: "etcd".to_string(),
message: format!("Failed to fetch keys from etcd: {}", e),
source: Some(Box::new(e)),
}
})?;
let mut new_cache = HashMap::new();
for kv in response.kvs() {
if let (Ok(key), Ok(value)) = (kv.key_str(), kv.value_str()) {
let key = if !prefix_str.is_empty() && key.starts_with(prefix_str) {
&key[prefix_str.len()..]
} else {
key
};
let key = key.replace('/', ".");
new_cache.insert(key, value.to_string());
}
}
Ok::<HashMap<String, String>, ConfigError>(new_cache)
})
});
handle.join().map_err(|_| ConfigError::SourceError {
source_name: "etcd".to_string(),
message: "Failed to join reload thread".to_string(),
source: None,
})?
} else {
RELOAD_RUNTIME.block_on(async move {
let prefix_str = prefix.as_deref().unwrap_or("");
let mut client = Client::connect(&endpoints, None).await.map_err(|e| {
ConfigError::SourceError {
source_name: "etcd".to_string(),
message: format!("Failed to connect to etcd: {}", e),
source: Some(Box::new(e)),
}
})?;
let options = GetOptions::new().with_prefix();
let response = client.get(prefix_str, Some(options)).await.map_err(|e| {
ConfigError::SourceError {
source_name: "etcd".to_string(),
message: format!("Failed to fetch keys from etcd: {}", e),
source: Some(Box::new(e)),
}
})?;
let mut new_cache = HashMap::new();
for kv in response.kvs() {
if let (Ok(key), Ok(value)) = (kv.key_str(), kv.value_str()) {
let key = if !prefix_str.is_empty() && key.starts_with(prefix_str) {
&key[prefix_str.len()..]
} else {
key
};
let key = key.replace('/', ".");
new_cache.insert(key, value.to_string());
}
}
Ok::<HashMap<String, String>, ConfigError>(new_cache)
})
}?;
self.cache = new_cache;
Ok(())
}
}
impl ConfigSource for EtcdAdapter {
fn name(&self) -> &str {
"etcd"
}
fn priority(&self) -> u8 {
self.priority
}
fn get(&self, key: &ConfigKey) -> Result<Option<ConfigValue>> {
Ok(self
.cache
.get(key.as_str())
.map(|v| ConfigValue::from(v.as_str())))
}
fn all_keys(&self) -> Result<Vec<ConfigKey>> {
Ok(self
.cache
.keys()
.map(|k| ConfigKey::from(k.as_str()))
.collect())
}
fn reload(&mut self) -> Result<()> {
self.reload_sync()
}
}