use {
dashmap::DashMap,
std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
},
tokio::time::interval,
tracing::error,
};
use zus_common::{ConnectionPoolConfig, Result};
use crate::selector::ZooServerSelect;
pub struct ServiceDiscoveryManager {
discovery_clients: Arc<DashMap<String, Arc<ZooServerSelect>>>,
health_check_count: Arc<AtomicUsize>,
}
impl ServiceDiscoveryManager {
pub fn new() -> Arc<Self> {
let manager = Arc::new(Self {
discovery_clients: Arc::new(DashMap::new()),
health_check_count: Arc::new(AtomicUsize::new(0)),
});
let manager_clone = manager.clone();
tokio::spawn(async move {
manager_clone.health_check_loop().await;
});
manager
}
pub async fn get_service_discovery(
&self,
addresses: Vec<String>,
service_path: String,
) -> Result<Arc<ZooServerSelect>> {
self
.get_service_discovery_with_pool_config(addresses, service_path, None)
.await
}
pub async fn get_service_discovery_with_pooling(
&self,
addresses: Vec<String>,
service_path: String,
pool_config: ConnectionPoolConfig,
) -> Result<Arc<ZooServerSelect>> {
self
.get_service_discovery_with_pool_config(addresses, service_path, Some(pool_config))
.await
}
pub async fn get_service_discovery_with_pool_config(
&self,
addresses: Vec<String>,
service_path: String,
pool_config: Option<ConnectionPoolConfig>,
) -> Result<Arc<ZooServerSelect>> {
let cache_key = format!("{}:{}:{}", addresses.join(","), service_path, pool_config.is_some());
if let Some(client) = self.discovery_clients.get(&cache_key) {
return Ok(client.clone());
}
let client = ZooServerSelect::with_pool_config(addresses, service_path, pool_config).await?;
self.discovery_clients.insert(cache_key, client.clone());
Ok(client)
}
async fn health_check_loop(self: Arc<Self>) {
let mut tick = interval(Duration::from_secs(3));
loop {
tick.tick().await;
self.health_check_count.fetch_add(1, Ordering::Relaxed);
for entry in self.discovery_clients.iter() {
if let Err(e) = entry.value().check_server_status().await {
error!("Health check failed for {}: {:?}", entry.key(), e);
}
}
}
}
#[doc(hidden)]
pub fn health_check_count(&self) -> usize {
self.health_check_count.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_manager_creation() {
let manager = ServiceDiscoveryManager::new();
assert_eq!(manager.discovery_clients.len(), 0);
}
}