use std::sync::Arc;
use std::time::Duration;
use parking_lot::RwLock;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use crate::api::naming::{
Instance, InstanceRequest, InstanceResponse, QueryServiceResponse, ServiceListRequest,
ServiceListResponse, ServiceQueryRequest, SubscribeServiceRequest, SubscribeServiceResponse,
};
use crate::cache::FileCache;
use crate::common::{build_service_key, DEFAULT_GROUP};
use crate::error::{BatataError, Result};
use crate::naming::{
CallbackServiceListener, LoadBalancer, ServiceChangeEvent, ServiceInfoCache, ServiceListener,
SubscriberRegistry, WeightedRoundRobinBalancer,
};
use crate::remote::RpcClient;
use crate::CacheConfig;
pub struct NamingService {
rpc_client: Arc<RpcClient>,
cache: Arc<ServiceInfoCache>,
file_cache: Option<Arc<FileCache>>,
cache_config: CacheConfig,
balancer: Arc<dyn LoadBalancer>,
subscribers: Arc<SubscriberRegistry>,
namespace: String,
group_name: String,
started: Arc<RwLock<bool>>,
heartbeat_task: Arc<RwLock<Option<JoinHandle<()>>>>,
shutdown: Arc<Notify>,
registered_instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
}
impl NamingService {
pub fn new(rpc_client: Arc<RpcClient>, namespace: &str, cache_config: CacheConfig) -> Self {
Self::with_balancer(
rpc_client,
namespace,
cache_config,
Arc::new(WeightedRoundRobinBalancer::new()),
)
}
pub fn with_balancer(
rpc_client: Arc<RpcClient>,
namespace: &str,
cache_config: CacheConfig,
balancer: Arc<dyn LoadBalancer>,
) -> Self {
let file_cache = cache_config
.cache_dir
.as_ref()
.and_then(|dir| FileCache::new(dir).ok())
.map(Arc::new);
Self {
rpc_client,
cache: Arc::new(ServiceInfoCache::new()),
file_cache,
cache_config,
balancer,
subscribers: Arc::new(SubscriberRegistry::new()),
namespace: namespace.to_string(),
group_name: DEFAULT_GROUP.to_string(),
started: Arc::new(RwLock::new(false)),
heartbeat_task: Arc::new(RwLock::new(None)),
shutdown: Arc::new(Notify::new()),
registered_instances: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn with_group(mut self, group_name: &str) -> Self {
self.group_name = group_name.to_string();
self
}
pub async fn start(&self) -> Result<()> {
if *self.started.read() {
return Err(BatataError::ClientAlreadyStarted);
}
*self.started.write() = true;
let instances = self.registered_instances.clone();
let rpc_client = self.rpc_client.clone();
let namespace = self.namespace.clone();
let shutdown = self.shutdown.clone();
let handle = tokio::spawn(async move {
Self::heartbeat_loop(instances, rpc_client, namespace, shutdown).await;
});
*self.heartbeat_task.write() = Some(handle);
info!("NamingService started");
Ok(())
}
pub async fn stop(&self) {
*self.started.write() = false;
self.shutdown.notify_one();
if let Some(handle) = self.heartbeat_task.write().take() {
handle.abort();
}
let instances = self.registered_instances.read().clone();
for (service_name, group_name, instance) in instances {
if let Err(e) = self.deregister_instance(&service_name, &group_name, instance).await {
warn!("Failed to deregister instance on shutdown: {}", e);
}
}
info!("NamingService stopped");
}
pub async fn register_instance(
&self,
service_name: &str,
group_name: &str,
instance: Instance,
) -> Result<()> {
let group_name = if group_name.is_empty() {
&self.group_name
} else {
group_name
};
let mut instance = instance;
instance.generate_instance_id();
let request =
InstanceRequest::register(&self.namespace, service_name, group_name, instance.clone());
let response: InstanceResponse = self.rpc_client.request(&request).await?;
if !response.response.success {
return Err(BatataError::server_error(
response.response.error_code,
response.response.message,
));
}
if instance.ephemeral {
self.registered_instances.write().push((
service_name.to_string(),
group_name.to_string(),
instance,
));
}
info!(
"Registered instance: service={}, group={}",
service_name, group_name
);
Ok(())
}
pub async fn register_instance_simple(
&self,
service_name: &str,
ip: &str,
port: i32,
) -> Result<()> {
let instance = Instance::new(ip, port);
self.register_instance(service_name, &self.group_name, instance)
.await
}
pub async fn deregister_instance(
&self,
service_name: &str,
group_name: &str,
instance: Instance,
) -> Result<()> {
let group_name = if group_name.is_empty() {
&self.group_name
} else {
group_name
};
let request =
InstanceRequest::deregister(&self.namespace, service_name, group_name, instance.clone());
let response: InstanceResponse = self.rpc_client.request(&request).await?;
if !response.response.success {
return Err(BatataError::server_error(
response.response.error_code,
response.response.message,
));
}
self.registered_instances
.write()
.retain(|(s, g, i)| !(s == service_name && g == group_name && i.key() == instance.key()));
info!(
"Deregistered instance: service={}, group={}",
service_name, group_name
);
Ok(())
}
pub async fn deregister_instance_simple(
&self,
service_name: &str,
ip: &str,
port: i32,
) -> Result<()> {
let instance = Instance::new(ip, port);
self.deregister_instance(service_name, &self.group_name, instance)
.await
}
pub async fn update_instance(
&self,
service_name: &str,
group_name: &str,
instance: Instance,
) -> Result<()> {
let group_name = if group_name.is_empty() {
&self.group_name
} else {
group_name
};
let request =
InstanceRequest::update(&self.namespace, service_name, group_name, instance.clone());
let response: InstanceResponse = self.rpc_client.request(&request).await?;
if !response.response.success {
return Err(BatataError::server_error(
response.response.error_code,
response.response.message,
));
}
{
let mut registered = self.registered_instances.write();
for (s, g, i) in registered.iter_mut() {
if s == service_name && g == group_name && i.key() == instance.key() {
*i = instance.clone();
break;
}
}
}
info!(
"Updated instance: service={}, group={}",
service_name, group_name
);
Ok(())
}
pub async fn update_instance_simple(
&self,
service_name: &str,
ip: &str,
port: i32,
weight: f64,
enabled: bool,
) -> Result<()> {
let instance = Instance::new(ip, port)
.with_weight(weight)
.with_enabled(enabled);
self.update_instance(service_name, &self.group_name, instance)
.await
}
pub async fn get_service(
&self,
service_name: &str,
group_name: &str,
clusters: &[String],
) -> Result<crate::api::naming::Service> {
let group_name = if group_name.is_empty() {
&self.group_name
} else {
group_name
};
let cluster_str = clusters.join(",");
let request = ServiceQueryRequest::new(&self.namespace, service_name, group_name)
.with_cluster(&cluster_str);
let response: QueryServiceResponse = self.rpc_client.request(&request).await?;
self.cache.put(&self.namespace, response.service_info.clone());
Ok(response.service_info)
}
pub async fn get_all_instances(
&self,
service_name: &str,
group_name: &str,
) -> Result<Vec<Instance>> {
self.select_instances(service_name, group_name, false).await
}
pub async fn select_instances(
&self,
service_name: &str,
group_name: &str,
healthy_only: bool,
) -> Result<Vec<Instance>> {
let group_name = if group_name.is_empty() {
&self.group_name
} else {
group_name
};
if let Some(service) = self.cache.get(&self.namespace, group_name, service_name) {
let instances = if healthy_only {
service
.hosts
.into_iter()
.filter(|i| i.healthy && i.enabled)
.collect()
} else {
service.hosts
};
return Ok(instances);
}
let request = ServiceQueryRequest::new(&self.namespace, service_name, group_name)
.with_healthy_only(healthy_only);
match self
.rpc_client
.request::<_, QueryServiceResponse>(&request)
.await
{
Ok(response) => {
self.cache
.put(&self.namespace, response.service_info.clone());
if let Some(file_cache) = &self.file_cache {
if let Err(e) = file_cache.save_service(&self.namespace, &response.service_info)
{
warn!("Failed to save service to file cache: {}", e);
}
}
let instances = if healthy_only {
response
.service_info
.hosts
.into_iter()
.filter(|i| i.healthy && i.enabled)
.collect()
} else {
response.service_info.hosts
};
Ok(instances)
}
Err(e) => {
if self.cache_config.failover_enabled {
if let Some(file_cache) = &self.file_cache {
if let Some(service) =
file_cache.load_service(&self.namespace, group_name, service_name)
{
warn!(
"Using cached service due to server error: {} (service={}, group={})",
e, service_name, group_name
);
if self.cache_config.update_cache_when_empty {
self.cache.put(&self.namespace, service.clone());
}
let instances = if healthy_only {
service
.hosts
.into_iter()
.filter(|i| i.healthy && i.enabled)
.collect()
} else {
service.hosts
};
return Ok(instances);
}
}
}
Err(e)
}
}
}
pub async fn select_one_healthy_instance(
&self,
service_name: &str,
group_name: &str,
) -> Result<Instance> {
let group_name = if group_name.is_empty() {
&self.group_name
} else {
group_name
};
let instances = self.select_instances(service_name, group_name, false).await?;
if instances.is_empty() {
return Err(BatataError::ServiceNotFound {
service_name: service_name.to_string(),
group_name: group_name.to_string(),
namespace: self.namespace.clone(),
});
}
let service_key = build_service_key(service_name, group_name, &self.namespace);
self.balancer
.select(&service_key, &instances)
.ok_or_else(|| BatataError::ServiceNotFound {
service_name: service_name.to_string(),
group_name: group_name.to_string(),
namespace: self.namespace.clone(),
})
}
pub async fn get_services_of_server(
&self,
group_name: &str,
page_no: i32,
page_size: i32,
) -> Result<(i32, Vec<String>)> {
let group_name = if group_name.is_empty() {
&self.group_name
} else {
group_name
};
let request = ServiceListRequest::new(&self.namespace, group_name)
.with_page(page_no, page_size);
let response: ServiceListResponse = self.rpc_client.request(&request).await?;
Ok((response.count, response.service_names))
}
pub async fn subscribe<L>(
&self,
service_name: &str,
group_name: &str,
listener: L,
) -> Result<()>
where
L: ServiceListener + 'static,
{
let group_name = if group_name.is_empty() {
&self.group_name
} else {
group_name
};
self.subscribers.subscribe(
&self.namespace,
group_name,
service_name,
Arc::new(listener),
);
let request = SubscribeServiceRequest::subscribe(&self.namespace, service_name, group_name);
let response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
self.cache.put(&self.namespace, response.service_info.clone());
info!(
"Subscribed to service: service={}, group={}",
service_name, group_name
);
Ok(())
}
pub async fn subscribe_callback<F>(
&self,
service_name: &str,
group_name: &str,
callback: F,
) -> Result<()>
where
F: Fn(ServiceChangeEvent) + Send + Sync + 'static,
{
self.subscribe(service_name, group_name, CallbackServiceListener::new(callback))
.await
}
pub async fn unsubscribe(&self, service_name: &str, group_name: &str) -> Result<()> {
let group_name = if group_name.is_empty() {
&self.group_name
} else {
group_name
};
self.subscribers
.unsubscribe(&self.namespace, group_name, service_name);
let request = SubscribeServiceRequest::unsubscribe(&self.namespace, service_name, group_name);
let _response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
info!(
"Unsubscribed from service: service={}, group={}",
service_name, group_name
);
Ok(())
}
pub async fn get_server_status(&self) -> Result<String> {
if self.rpc_client.is_connected() {
Ok("UP".to_string())
} else {
Ok("DOWN".to_string())
}
}
async fn heartbeat_loop(
instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
rpc_client: Arc<RpcClient>,
namespace: String,
shutdown: Arc<Notify>,
) {
let heartbeat_interval = Duration::from_secs(5);
loop {
tokio::select! {
_ = shutdown.notified() => {
info!("Heartbeat loop shutdown");
break;
}
_ = tokio::time::sleep(heartbeat_interval) => {
let registered = instances.read().clone();
for (service_name, group_name, instance) in registered {
if !instance.ephemeral {
continue;
}
let request = InstanceRequest::register(
&namespace,
&service_name,
&group_name,
instance,
);
if let Err(e) = rpc_client.request::<_, InstanceResponse>(&request).await {
warn!(
"Heartbeat failed for service={}, group={}: {}",
service_name, group_name, e
);
} else {
debug!(
"Heartbeat sent for service={}, group={}",
service_name, group_name
);
}
}
}
}
}
}
}
impl Drop for NamingService {
fn drop(&mut self) {
self.shutdown.notify_one();
}
}