use nacos_rust_client::client::config_client::{listener::ConfigListener, ConfigKey};
use nacos_rust_client::client::naming_client::Instance;
use crate::config::{NacosConfig, ServerConfig};
pub async fn register_service(
config: &NacosConfig,
server_config: &ServerConfig,
) -> Result<(), anyhow::Error> {
let naming_client = super::client::get_naming_client()
.map_err(|e| anyhow::anyhow!("获取 Nacos 命名客户端失败: {}", e))?;
let service_ip = config
.service_ip
.as_ref()
.unwrap_or(&server_config.addr)
.clone();
let service_ip = if service_ip == "0.0.0.0" {
get_local_ip().unwrap_or_else(|| {
tracing::warn!("无法自动获取本机 IP,Nacos 注册将使用 127.0.0.1");
"127.0.0.1".to_string()
})
} else {
service_ip
};
let service_port = config.service_port.unwrap_or(server_config.port as u32);
let mut metadata = std::collections::HashMap::new();
if let Some(ref meta) = config.metadata {
metadata.extend(meta.clone());
}
if let Some(ref health_path) = config.health_check_path {
metadata.insert("health_check_path".to_string(), health_path.clone());
}
let service_name = if !config.service_name.is_empty() {
config.service_name.clone()
} else {
std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown-service".to_string())
};
let mut instance =
Instance::new_simple(&service_ip, service_port, &service_name, config.effective_naming_group());
if !metadata.is_empty() {
instance.metadata = Some(metadata.clone());
}
naming_client.register(instance);
tracing::info!(
"服务注册成功: {}:{} (服务名: {}, 组: {}, 命名空间: {:?}, 元数据: {:?})",
service_ip,
service_port,
service_name,
config.effective_naming_group(),
config.effective_naming_namespace(),
metadata
);
Ok(())
}
pub async fn deregister_service(
config: &NacosConfig,
server_config: &ServerConfig,
) -> Result<(), anyhow::Error> {
let naming_client = super::client::get_naming_client()
.map_err(|e| anyhow::anyhow!("获取 Nacos 命名客户端失败: {}", e))?;
let service_name = if !config.service_name.is_empty() {
config.service_name.clone()
} else {
std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown-service".to_string())
};
let service_ip = config
.service_ip
.as_ref()
.unwrap_or(&server_config.addr)
.clone();
let service_port = config.service_port.unwrap_or(server_config.port as u32);
let instance =
Instance::new_simple(&service_ip, service_port, &service_name, config.effective_naming_group());
naming_client.unregister(instance);
tracing::info!(
"服务注销成功: {}:{} (服务名: {}, 组: {}, 命名空间: {:?})",
service_ip,
service_port,
service_name,
config.effective_naming_group(),
config.effective_naming_namespace()
);
Ok(())
}
pub async fn subscribe_services(config: &NacosConfig) -> Result<(), anyhow::Error> {
if config.subscribe_services.is_empty() {
return Ok(());
}
let naming_client = super::client::get_naming_client()
.map_err(|e| anyhow::anyhow!("获取 Nacos 命名客户端失败: {}", e))?;
for service_name in &config.subscribe_services {
let service_name_clone = service_name.clone();
use nacos_rust_client::client::naming_client::{InstanceListener, ServiceInstanceKey};
let group_name = config.effective_naming_group().to_string();
struct ServiceChangeListener {
service_name: String,
group_name: String,
}
impl InstanceListener for ServiceChangeListener {
fn get_key(&self) -> ServiceInstanceKey {
ServiceInstanceKey::new(&self.service_name, &self.group_name)
}
fn change(
&self,
_key: &ServiceInstanceKey,
value: &Vec<std::sync::Arc<nacos_rust_client::client::naming_client::Instance>>,
add_list: &Vec<std::sync::Arc<nacos_rust_client::client::naming_client::Instance>>,
remove_list: &Vec<
std::sync::Arc<nacos_rust_client::client::naming_client::Instance>,
>,
) {
tracing::info!(
"服务 {} 实例列表发生变化 (全量:{} 新增:{} 移除:{})",
self.service_name,
value.len(),
add_list.len(),
remove_list.len()
);
super::update_service_instances(&self.service_name, value.clone());
for instance in value {
tracing::info!(
" - 服务实例: {}:{} (健康状态: {})",
instance.ip,
instance.port,
instance.healthy
);
}
}
}
use nacos_rust_client::client::naming_client::QueryInstanceListParams;
let params = QueryInstanceListParams::new_simple(&service_name_clone, config.effective_naming_group());
match naming_client.query_instances(params).await {
Ok(instances) => {
let instance_count = instances.len();
tracing::info!(
"获取服务 {} 的当前实例列表成功,实例数量: {}",
service_name_clone,
instance_count
);
super::update_service_instances(&service_name_clone, instances.clone());
for instance in &instances {
tracing::info!(
" - 服务实例: {}:{} (健康状态: {})",
instance.ip,
instance.port,
instance.healthy
);
}
}
Err(e) => {
tracing::warn!("获取服务 {} 的当前实例列表失败: {}", service_name_clone, e);
}
}
let listener = Box::new(ServiceChangeListener {
service_name: service_name_clone.clone(),
group_name,
});
naming_client
.subscribe(listener)
.await
.map_err(|e| anyhow::anyhow!("订阅服务 {} 失败: {}", service_name, e))?;
tracing::info!(
"订阅服务成功: {} (命名空间: {:?}, 组: {})",
service_name,
config.effective_naming_namespace(),
config.effective_naming_group()
);
}
Ok(())
}
pub async fn subscribe_configs(config: &NacosConfig) -> Result<(), anyhow::Error> {
if config.subscribe_configs.is_empty() {
return Ok(());
}
let config_client = super::client::get_config_client()
.map_err(|e| anyhow::anyhow!("获取 Nacos 配置客户端失败: {}", e))?;
for config_item in &config.subscribe_configs {
let data_id = config_item.data_id.clone();
let group = config_item.group.clone();
let namespace = config_item.namespace.clone();
struct ConfigChangeListener {
data_id: String,
group: String,
namespace: String,
}
impl ConfigListener for ConfigChangeListener {
fn get_key(&self) -> ConfigKey {
ConfigKey::new(&self.data_id, &self.group, &self.namespace)
}
fn change(&self, key: &ConfigKey, value: &str) {
let namespace = if key.tenant.is_empty() {
"public"
} else {
&key.tenant
};
tracing::info!(
"配置变更: {} (组: {}, 命名空间: {}, 值长度: {} 字节)",
key.data_id,
key.group,
namespace,
value.len()
);
tracing::debug!("配置内容: {}", value);
super::update_config(&key.data_id, &key.group, namespace, value.to_string());
}
}
let listener = Box::new(ConfigChangeListener {
data_id: data_id.clone(),
group: group.clone(),
namespace: namespace.clone(),
});
config_client
.subscribe(listener)
.await
.map_err(|e| anyhow::anyhow!("订阅配置 {} 失败: {}", data_id, e))?;
tracing::info!(
"订阅配置成功: {} (组: {}, 命名空间: {})",
data_id,
group,
if namespace.is_empty() {
"public"
} else {
&namespace
}
);
}
Ok(())
}
fn get_local_ip() -> Option<String> {
use std::net::UdpSocket;
let socket = UdpSocket::bind("0.0.0.0:0").ok()?;
socket.connect("8.8.8.8:80").ok()?;
socket.local_addr().ok().map(|a| a.ip().to_string())
}