use crate::domain::{ConfigError, ConfigKey, Result};
use crate::ports::{ChangeCallback, ConfigWatcher};
use etcd_client::{Client, WatchOptions};
use std::sync::mpsc::{channel, Sender};
use std::thread::{self, JoinHandle};
#[derive(Debug)]
pub struct EtcdWatcher {
endpoints: Vec<String>,
prefix: Option<String>,
stop_tx: Option<Sender<()>>,
watch_thread: Option<JoinHandle<()>>,
}
impl EtcdWatcher {
pub async fn new<S: AsRef<str>>(endpoints: Vec<S>, prefix: Option<&str>) -> Result<Self> {
let endpoints: Vec<String> = endpoints.iter().map(|s| s.as_ref().to_string()).collect();
let _client =
Client::connect(&endpoints, None)
.await
.map_err(|e| ConfigError::WatcherError {
message: format!("Failed to connect to etcd: {}", e),
source: Some(Box::new(e)),
})?;
Ok(Self {
endpoints,
prefix: prefix.map(|s| s.to_string()),
stop_tx: None,
watch_thread: None,
})
}
}
impl ConfigWatcher for EtcdWatcher {
fn watch(&mut self, callback: ChangeCallback) -> Result<()> {
if self.watch_thread.is_some() {
return Err(ConfigError::WatcherError {
message: "Watcher is already running".to_string(),
source: None,
});
}
let (stop_tx, stop_rx) = channel();
self.stop_tx = Some(stop_tx);
let endpoints = self.endpoints.clone();
let prefix = self.prefix.clone();
let watch_thread = thread::spawn(move || {
let runtime = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
tracing::error!("Failed to create tokio runtime for etcd watcher: {}", e);
return;
}
};
runtime.block_on(async move {
loop {
if stop_rx.try_recv().is_ok() {
tracing::debug!("etcd watcher stopping");
break;
}
let mut client = match Client::connect(&endpoints, None).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to connect to etcd for watching: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
continue;
}
};
let watch_prefix = prefix.as_deref().unwrap_or("");
tracing::info!("Starting etcd watch on prefix: {}", watch_prefix);
let options = WatchOptions::new().with_prefix();
let (mut _watcher, mut stream) = match client.watch(watch_prefix, Some(options)).await {
Ok((w, s)) => (w, s),
Err(e) => {
tracing::error!("Failed to create etcd watch: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
continue;
}
};
loop {
if stop_rx.try_recv().is_ok() {
tracing::debug!("etcd watcher stopping");
return;
}
tokio::select! {
Ok(resp) = stream.message() => {
if let Some(watch_resp) = resp {
for event in watch_resp.events() {
if let Some(kv) = event.kv() {
if let Ok(key_str) = kv.key_str() {
let key = if !watch_prefix.is_empty() && key_str.starts_with(watch_prefix) {
&key_str[watch_prefix.len()..]
} else {
key_str
};
let key = key.replace('/', ".");
tracing::debug!("etcd key changed: {}", key);
callback(ConfigKey::from(key));
}
}
}
}
}
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
}
}
}
}
});
});
self.watch_thread = Some(watch_thread);
Ok(())
}
fn stop(&mut self) -> Result<()> {
if let Some(stop_tx) = self.stop_tx.take() {
let _ = stop_tx.send(());
}
if let Some(handle) = self.watch_thread.take() {
handle.join().map_err(|_| ConfigError::WatcherError {
message: "Failed to join etcd watcher thread".to_string(),
source: None,
})?;
}
Ok(())
}
}
impl Drop for EtcdWatcher {
fn drop(&mut self) {
let _ = self.stop();
}
}