use {bytes::Bytes, dashmap::DashMap, std::sync::Arc, tracing::info};
use zus_common::{ConnectionPoolConfig, Result, RpcEndpoint};
use crate::{
pool::{ServerActiveStatus, ServiceEndpointPool},
zooserver::ZusZooClient,
};
pub struct ZooServerSelect {
#[allow(dead_code)]
addresses: Vec<String>, zoo_client: Arc<ZusZooClient>,
service_pools: Arc<DashMap<String, Arc<ServiceEndpointPool>>>,
pool_config: Option<ConnectionPoolConfig>,
}
impl ZooServerSelect {
pub async fn new(addresses: Vec<String>, service_path: String) -> Result<Arc<Self>> {
Self::with_pool_config(addresses, service_path, None).await
}
pub async fn new_with_pooling(
addresses: Vec<String>,
service_path: String,
pool_config: ConnectionPoolConfig,
) -> Result<Arc<Self>> {
Self::with_pool_config(addresses, service_path, Some(pool_config)).await
}
pub async fn with_pool_config(
addresses: Vec<String>,
service_path: String,
pool_config: Option<ConnectionPoolConfig>,
) -> Result<Arc<Self>> {
let zoo_client = ZusZooClient::new(addresses.clone()).await?;
let select = Arc::new(Self {
addresses,
zoo_client: zoo_client.clone(),
service_pools: Arc::new(DashMap::new()),
pool_config,
});
select.init_path(service_path).await?;
Ok(select)
}
async fn init_path(&self, service_path: String) -> Result<()> {
let pool =
ServiceEndpointPool::with_pool_config(service_path.clone(), self.zoo_client.clone(), self.pool_config.clone())
.await?;
self.service_pools.insert(service_path, pool);
Ok(())
}
async fn get_or_create_pool(&self, path: &str) -> Result<Arc<ServiceEndpointPool>> {
if let Some(pool) = self.service_pools.get(path) {
return Ok(pool.clone());
}
let pool =
ServiceEndpointPool::with_pool_config(path.to_string(), self.zoo_client.clone(), self.pool_config.clone())
.await?;
self.service_pools.insert(path.to_string(), pool.clone());
Ok(pool)
}
pub async fn select_one_server(&self, path: &str) -> Result<Arc<RpcEndpoint>> {
let pool = self.get_or_create_pool(path).await?;
pool.select_one_server()
}
pub async fn get_all_servers(&self, path: &str) -> Result<Vec<Arc<RpcEndpoint>>> {
let pool = self.get_or_create_pool(path).await?;
Ok(pool.get_all_servers())
}
pub async fn check_server_status(&self) -> Result<()> {
for entry in self.service_pools.iter() {
let pool = entry.value();
let servers = pool.get_all_servers();
for server in servers {
let result = server
.sync_call(Bytes::from(".CheckServerStatus"), Bytes::new(), 3000)
.await;
let status = if result.is_ok() {
ServerActiveStatus::Ok
} else {
ServerActiveStatus::Fail
};
pool.update_health_status(server.address(), status);
}
}
Ok(())
}
pub async fn on_path_evt_child_changed(&self, path: &str) -> Result<()> {
if let Some(pool) = self.service_pools.get(path) {
info!("Path children changed: {}", path);
pool.refresh_endpoints().await?;
}
Ok(())
}
pub fn zoo_client(&self) -> &Arc<ZusZooClient> {
&self.zoo_client
}
}