use fusen_register::{
Register,
directory::Directory,
error::RegisterError,
fusen_internal_common::{BoxFuture, protocol::Protocol, resource::service::ServiceResource},
};
use nacos_sdk::api::{
naming::{
NamingChangeEvent, NamingEventListener, NamingService, NamingServiceBuilder,
ServiceInstance,
},
props::ClientProps,
};
use std::sync::Arc;
use tracing::{error, info};
use crate::nacos::NacosConfig;
#[derive(Clone)]
pub struct NacosRegister {
pub naming_service: Arc<NamingService>,
}
impl NacosRegister {
pub fn init_nacos_register(
app_name: &str,
config: Arc<NacosConfig>,
) -> Result<NacosRegister, crate::error::Error> {
let mut client_props = ClientProps::new();
client_props = client_props
.server_addr(config.server_addr.clone())
.namespace(config.namespace.clone().unwrap_or_default())
.app_name(app_name)
.auth_username(config.username.clone().unwrap_or_default())
.auth_password(config.password.clone().unwrap_or_default());
let builder = NamingServiceBuilder::new(client_props);
let builder = if config.username.is_some() {
builder.enable_auth_plugin_http()
} else {
builder
};
let naming_service = Arc::new(
builder
.build()
.map_err(|error| crate::error::Error::RegisterError(Box::new(error)))?,
);
let nacos_register = NacosRegister {
naming_service: naming_service.clone(),
};
Ok(nacos_register)
}
}
impl Register for NacosRegister {
fn register(
&self,
resource: Arc<ServiceResource>,
protocol: Protocol,
) -> BoxFuture<Result<(), RegisterError>> {
let nacos = self.clone();
Box::pin(async move {
let service_name = get_service_name(resource.as_ref(), &protocol);
let instance = build_instance(resource.as_ref());
info!(
"nacos register service: {service_name} - group: {:?}",
resource.group
);
let ret = nacos
.naming_service
.register_instance(service_name, resource.group.clone(), instance)
.await;
if let Err(error) = ret {
error!("nacos register to nacos occur an error: {error:?}");
return Err(RegisterError::Error(Box::new(error)));
}
Ok(())
})
}
fn deregister(
&self,
resource: Arc<ServiceResource>,
protocol: Protocol,
) -> BoxFuture<Result<(), RegisterError>> {
let nacos = self.clone();
Box::pin(async move {
let service_name = get_service_name(resource.as_ref(), &protocol);
let instance = build_instance(resource.as_ref());
info!(
"nacos deregister service: {service_name} - group: {:?}",
resource.group
);
let ret = nacos
.naming_service
.deregister_instance(service_name, resource.group.clone(), instance)
.await;
if let Err(error) = ret {
error!("nacos deregister to nacos occur an error: {error:?}",);
return Err(RegisterError::Error(Box::new(error)));
}
Ok(())
})
}
fn subscribe(
&self,
resource: ServiceResource,
protocol: Protocol,
) -> BoxFuture<Result<Directory, RegisterError>> {
let nacos = self.clone();
Box::pin(async move {
let service_name = get_service_name(&resource, &protocol);
info!(
"subscribe service: {service_name} - grep: {:?}",
resource.group
);
let directory = Directory::default();
let directory_clone = directory.clone();
let naming_service = nacos.naming_service.clone();
let service_instances = naming_service
.get_all_instances(
service_name.clone(),
resource.group.clone(),
Vec::new(),
false,
)
.await
.map_err(|error| RegisterError::Error(Box::new(error)))?;
let service_instances = to_service_resources(service_instances);
directory.change(service_instances).await?;
let event_listener = ServiceChangeListener::new(directory);
let event_listener = Arc::new(event_listener);
naming_service
.subscribe(
service_name,
resource.group.clone(),
Vec::new(),
event_listener,
)
.await
.map_err(|error| RegisterError::Error(Box::new(error)))?;
Ok(directory_clone)
})
}
}
#[derive(Clone)]
struct ServiceChangeListener {
directory: Directory,
}
impl ServiceChangeListener {
fn new(directory: Directory) -> Self {
Self { directory }
}
}
impl NamingEventListener for ServiceChangeListener {
fn event(&self, event: Arc<NamingChangeEvent>) {
info!("service change: {}", event.service_name);
info!("nacos event: {event:?}");
let directory = self.directory.clone();
let instances = event.instances.to_owned();
tokio::spawn(async move {
let instances = instances;
let resources = if let Some(instances) = instances {
to_service_resources(instances)
} else {
vec![]
};
let _ = directory.change(resources).await;
});
}
}
fn to_service_resources(service_instances: Vec<ServiceInstance>) -> Vec<ServiceResource> {
service_instances.into_iter().fold(vec![], |mut vec, e| {
let resource = ServiceResource {
addr: format!("http://{}:{}", e.ip(), e.port),
service_id: e.service_name.unwrap_or_default(),
group: None,
version: None,
methods: Default::default(),
weight: Some(e.weight),
metadata: e.metadata,
};
vec.push(resource);
vec
})
}
pub fn get_service_name(resource: &ServiceResource, protocol: &Protocol) -> String {
match &protocol {
&Protocol::SpringCloud(app_name) => app_name.clone(),
&Protocol::Dubbo | Protocol::Fusen => format!(
"providers:{}:{}:{}",
resource.service_id,
resource.version.as_ref().map_or("", |e| e),
resource.group.as_ref().map_or("", |e| e),
),
_ => unimplemented!(),
}
}
fn build_instance(resource: &ServiceResource) -> ServiceInstance {
let (ip, port) = resource.addr.split_once(':').unwrap();
nacos_sdk::api::naming::ServiceInstance {
ip: ip.to_string(),
port: port.parse().unwrap(),
metadata: resource.metadata.clone(),
..Default::default()
}
}