use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use tracing::{debug, info, instrument};
use crate::api::error::Error::ErrResult;
use crate::api::error::Result;
use crate::api::naming::{InstanceChooser, NamingEventListener, ServiceInstance};
use crate::api::plugin::AuthPlugin;
use crate::api::props::ClientProps;
use crate::common::cache::{Cache, CacheBuilder};
use crate::common::executor;
use crate::common::remote::grpc::NacosGrpcClient;
use crate::common::remote::grpc::NacosGrpcClientBuilder;
use crate::common::remote::grpc::message::GrpcRequestMessage;
use crate::common::remote::grpc::message::GrpcResponseMessage;
use crate::naming::message::request::*;
use crate::naming::message::response::*;
use self::chooser::RandomWeightChooser;
use self::dto::ServiceInfo;
use self::handler::NamingPushRequestHandler;
use self::observable::service_info_observable::ServiceInfoObserver;
use self::redo::{AutomaticRequest, NamingRedoTask, RedoTask, RedoTaskExecutor};
use self::updater::ServiceInfoUpdater;
mod chooser;
mod dto;
mod handler;
mod message;
mod observable;
mod redo;
mod updater;
pub(crate) struct NacosNamingService {
nacos_grpc_client: Arc<NacosGrpcClient>,
namespace: String,
redo_task_executor: Arc<RedoTaskExecutor>,
service_info_updater: ServiceInfoUpdater,
client_id: String,
naming_cache: Arc<Cache<ServiceInfo>>,
observer: ServiceInfoObserver,
}
impl std::fmt::Debug for NacosNamingService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NacosNamingService")
.field("namespace", &self.namespace)
.field("client_id", &self.client_id)
.finish()
}
}
const MODULE_NAME: &str = "naming";
static SEQ: AtomicU64 = AtomicU64::new(1);
impl NacosNamingService {
pub(crate) async fn new(
client_props: ClientProps,
auth_plugin: Arc<dyn AuthPlugin>,
) -> Result<Self> {
let server_list = Arc::new(client_props.get_server_list()?);
let mut namespace = client_props.get_namespace();
if namespace.is_empty() {
namespace = crate::api::constants::DEFAULT_NAMESPACE.to_owned();
}
let client_id = crate::common::util::generate_client_id(
MODULE_NAME,
&client_props.get_server_addr(),
&namespace,
&SEQ,
);
let redo_task_executor = Arc::new(RedoTaskExecutor::new(client_id.clone()));
let redo_task_executor_on_connected = redo_task_executor.clone();
let redo_task_executor_on_disconnected = redo_task_executor.clone();
let naming_cache: Cache<ServiceInfo> = CacheBuilder::naming(namespace.clone())
.load_cache_at_start(client_props.get_naming_load_cache_at_start())
.disk_store()
.build()
.await;
let naming_cache = Arc::new(naming_cache);
let (observer, emitter) = observable::service_info_observable::create(
client_id.clone(),
naming_cache.clone(),
client_props.get_naming_push_empty_protection(),
);
let server_request_handler = NamingPushRequestHandler::new(emitter.clone());
let nacos_grpc_client = NacosGrpcClientBuilder::new(server_list.to_vec())
.port(client_props.get_remote_grpc_port())
.namespace(namespace.clone())
.client_version(client_props.get_client_version())
.support_remote_connection(true)
.support_config_remote_metrics(true)
.support_naming_delta_push(false)
.support_naming_remote_metric(false)
.add_labels(client_props.get_labels())
.add_label(
crate::api::constants::common_remote::LABEL_SOURCE.to_owned(),
crate::api::constants::common_remote::LABEL_SOURCE_SDK.to_owned(),
)
.add_label(
crate::api::constants::common_remote::LABEL_MODULE.to_owned(),
crate::api::constants::common_remote::LABEL_MODULE_NAMING.to_owned(),
)
.app_name(client_props.get_app_name())
.register_server_request_handler::<NotifySubscriberRequest>(Arc::new(
server_request_handler,
))
.connected_listener(move |connection_id| {
info!("connection {} connected.", connection_id);
let redo = redo_task_executor_on_connected.clone();
executor::spawn(async move {
redo.on_grpc_client_reconnect().await;
});
})
.disconnected_listener(move |connection_id| {
info!("connection {} disconnected.", connection_id);
let redo = redo_task_executor_on_disconnected.clone();
executor::spawn(async move {
redo.on_grpc_client_disconnect().await;
});
})
.auth_context(client_props.get_auth_context())
.auth_plugin(auth_plugin)
.max_retries(client_props.get_max_retries())
.emergency_start(client_props.get_naming_load_cache_at_start())
.build(client_id.clone())
.await?;
let nacos_grpc_client = Arc::new(nacos_grpc_client);
let service_info_updater =
ServiceInfoUpdater::new(emitter, naming_cache.clone(), nacos_grpc_client.clone());
Ok(NacosNamingService {
redo_task_executor,
nacos_grpc_client,
namespace,
service_info_updater,
client_id,
naming_cache,
observer,
})
}
async fn request_to_server<R, P>(&self, request: R) -> Result<P>
where
R: GrpcRequestMessage + 'static,
P: GrpcResponseMessage + 'static,
{
self.nacos_grpc_client.send_request::<R, P>(request).await
}
async fn execute_by_register_with_redo<R, P>(&self, request: R, operation: &str) -> Result<()>
where
R: GrpcRequestMessage + AutomaticRequest + Clone + 'static,
P: GrpcResponseMessage + 'static,
{
let auto_request: Arc<dyn AutomaticRequest> = Arc::new(request.clone());
let redo_task = NamingRedoTask::new(self.nacos_grpc_client.clone(), auto_request);
let redo_task = Arc::new(redo_task);
redo_task.active();
self.redo_task_executor.add_task(redo_task.clone()).await;
let response = self.request_to_server::<R, P>(request).await?;
crate::common::error::handle_response(&response, operation)?;
redo_task.frozen();
Ok(())
}
async fn execute_by_deregister_with_redo<R, P>(&self, request: R, operation: &str) -> Result<()>
where
R: GrpcRequestMessage + AutomaticRequest + Clone + 'static,
P: GrpcResponseMessage + 'static,
{
let auto_request: Arc<dyn AutomaticRequest> = Arc::new(request.clone());
let redo_task = NamingRedoTask::new(self.nacos_grpc_client.clone(), auto_request);
let response = self.request_to_server::<R, P>(request).await?;
crate::common::error::handle_response(&response, operation)?;
self.redo_task_executor
.remove_task(redo_task.task_key().as_str())
.await;
Ok(())
}
}
impl NacosNamingService {
async fn register_ephemeral_instance_async(
&self,
service_name: String,
group_name: Option<String>,
service_instance: ServiceInstance,
) -> Result<()> {
info!(
"register ephemeral instance: service_name: {service_name}, group_name: {group_name:?}"
);
let namespace = Some(self.namespace.clone());
let group_name = crate::common::util::normalize_group_name(group_name);
let request = InstanceRequest::register(
service_instance,
Some(service_name),
namespace,
Some(group_name),
);
self.execute_by_register_with_redo::<InstanceRequest, InstanceResponse>(
request,
"register_ephemeral_instance",
)
.await
}
async fn register_persistent_instance_async(
&self,
service_name: String,
group_name: Option<String>,
service_instance: ServiceInstance,
) -> Result<()> {
info!(
"register persistent instance: service_name: {service_name}, group_name: {group_name:?}"
);
let namespace = Some(self.namespace.clone());
let group_name = crate::common::util::normalize_group_name(group_name);
let request = PersistentInstanceRequest::register(
service_instance,
Some(service_name),
namespace,
Some(group_name),
);
self.execute_by_register_with_redo::<PersistentInstanceRequest, InstanceResponse>(
request,
"register_persistent_instance",
)
.await
}
async fn deregister_ephemeral_instance_async(
&self,
service_name: String,
group_name: Option<String>,
service_instance: ServiceInstance,
) -> Result<()> {
info!(
"deregister ephemeral instance: service_name: {service_name}, group_name: {group_name:?}"
);
let namespace = Some(self.namespace.clone());
let group_name = crate::common::util::normalize_group_name(group_name);
let request = InstanceRequest::deregister(
service_instance,
Some(service_name),
namespace,
Some(group_name),
);
self.execute_by_deregister_with_redo::<InstanceRequest, InstanceResponse>(
request,
"deregister_ephemeral_instance",
)
.await
}
async fn deregister_persistent_instance_async(
&self,
service_name: String,
group_name: Option<String>,
service_instance: ServiceInstance,
) -> Result<()> {
info!(
"deregister persistent instance: service_name: {service_name}, group_name: {group_name:?}"
);
let namespace = Some(self.namespace.clone());
let group_name = crate::common::util::normalize_group_name(group_name);
let request = PersistentInstanceRequest::deregister(
service_instance,
Some(service_name),
namespace,
Some(group_name),
);
self.execute_by_deregister_with_redo::<PersistentInstanceRequest, InstanceResponse>(
request,
"deregister_persistent_instance",
)
.await
}
async fn batch_register_instance_async(
&self,
service_name: String,
group_name: Option<String>,
service_instances: Vec<ServiceInstance>,
) -> Result<()> {
let namespace = Some(self.namespace.clone());
let group_name = crate::common::util::normalize_group_name(group_name);
let request = BatchInstanceRequest::new(
service_instances,
namespace,
Some(service_name),
Some(group_name),
);
self.execute_by_register_with_redo::<BatchInstanceRequest, BatchInstanceResponse>(
request,
"batch_register_instance",
)
.await
}
async fn get_all_instances_async(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
subscribe: bool,
) -> Result<Vec<ServiceInstance>> {
let cluster_str = clusters.join(",");
let group_name = crate::common::util::normalize_group_name(group_name);
let service_info;
if subscribe {
let grouped_name = ServiceInfo::get_grouped_service_name(&service_name, &group_name);
let key = ServiceInfo::get_key(&grouped_name, &cluster_str);
if let Some(cached_info) = self.naming_cache.get(&key) {
info!("get_all_instances returned cached service_info, group_key={key}");
service_info = Some(cached_info.clone());
} else {
let subscribe_service_info = self
.subscribe_async(service_name, Some(group_name), clusters, None)
.await;
if let Ok(subscribe_service_info) = subscribe_service_info {
service_info = Some(subscribe_service_info);
} else {
service_info = None;
}
}
} else {
let request = ServiceQueryRequest {
cluster: cluster_str,
group_name: Some(group_name),
healthy_only: false,
udp_port: 0,
namespace: Some(self.namespace.clone()),
service_name: Some(service_name),
..Default::default()
};
let response = self
.request_to_server::<ServiceQueryRequest, QueryServiceResponse>(request)
.await?;
crate::common::error::handle_response(&response, "get_all_instances")?;
service_info = Some(response.service_info);
}
let Some(service_info) = service_info else {
return Ok(Vec::default());
};
let instances = service_info.hosts;
let Some(instances) = instances else {
return Ok(Vec::default());
};
Ok(instances)
}
async fn select_instances_async(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
subscribe: bool,
healthy: bool,
) -> Result<Vec<ServiceInstance>> {
let group_name = crate::common::util::normalize_group_name(group_name);
let all_instance = self
.get_all_instances_async(service_name, Some(group_name), clusters, subscribe)
.await?;
let ret: Vec<ServiceInstance> = all_instance
.into_iter()
.filter(|instance| {
healthy == instance.healthy && instance.enabled && instance.weight > 0.0
})
.collect();
Ok(ret)
}
async fn select_one_healthy_instance_async(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
subscribe: bool,
) -> Result<ServiceInstance> {
let group_name = crate::common::util::normalize_group_name(group_name);
let service_name_for_tip = service_name.clone();
let ret = self
.select_instances_async(
service_name.clone(),
Some(group_name),
clusters,
subscribe,
true,
)
.await?;
let chooser = RandomWeightChooser::new(service_name, ret)?;
let Some(instance) = chooser.choose() else {
return Err(ErrResult(format!(
"no available {service_name_for_tip} service instance can be selected"
)));
};
Ok(instance)
}
async fn get_service_list_async(
&self,
page_no: i32,
page_size: i32,
group_name: Option<String>,
) -> Result<(Vec<String>, i32)> {
let group_name = crate::common::util::normalize_group_name(group_name);
let namespace = Some(self.namespace.clone());
let request = ServiceListRequest {
page_no,
page_size,
group_name: Some(group_name),
namespace,
..Default::default()
};
let response = self
.request_to_server::<ServiceListRequest, ServiceListResponse>(request)
.await?;
crate::common::error::handle_response(&response, "get_service_list")?;
Ok((response.service_names, response.count))
}
async fn subscribe_async(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
event_listener: Option<Arc<dyn NamingEventListener>>,
) -> Result<ServiceInfo> {
let clusters = clusters.join(",");
let group_name = crate::common::util::normalize_group_name(group_name);
let key = ServiceInfo::get_key_without_clusters(&service_name, &group_name);
self.service_info_updater
.schedule_update(
self.namespace.clone(),
service_name.clone(),
group_name.clone(),
clusters.clone(),
)
.await;
if let Some(event_listener) = event_listener {
self.observer.subscribe(key.clone(), event_listener).await;
}
let request = SubscribeServiceRequest::new(
true,
clusters,
Some(service_name),
Some(self.namespace.clone()),
Some(group_name),
);
let auto_request: Arc<dyn AutomaticRequest> = Arc::new(request.clone());
let redo_task = NamingRedoTask::new(self.nacos_grpc_client.clone(), auto_request);
let redo_task = Arc::new(redo_task);
redo_task.active();
self.redo_task_executor.add_task(redo_task.clone()).await;
if let Some(cached_info) = self.naming_cache.get(&key) {
info!("subscribe returned cached service_info, group_key={key}");
let remote_client = self.nacos_grpc_client.clone();
executor::spawn(async move {
let response = remote_client
.send_request::<SubscribeServiceRequest, SubscribeServiceResponse>(request)
.await;
if let Ok(resp) = response
&& crate::common::error::handle_response(&resp, "subscribe").is_ok()
{
debug!("subscribe the {resp:?}");
redo_task.frozen();
}
});
return Ok(cached_info.clone());
}
let response = self
.request_to_server::<SubscribeServiceRequest, SubscribeServiceResponse>(request)
.await?;
crate::common::error::handle_response(&response, "subscribe")?;
debug!("subscribe the {response:?}");
redo_task.frozen();
Ok(response.service_info)
}
async fn unsubscribe_async(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
event_listener: Option<Arc<dyn NamingEventListener>>,
) -> Result<()> {
let clusters = clusters.join(",");
let group_name = crate::common::util::normalize_group_name(group_name);
self.service_info_updater
.stop_update(service_name.clone(), group_name.clone(), clusters.clone())
.await;
if let Some(event_listener) = event_listener {
let key = ServiceInfo::get_key_without_clusters(&service_name, &group_name);
self.observer.unsubscribe(key, event_listener).await;
}
let request = SubscribeServiceRequest::new(
false,
clusters,
Some(service_name),
Some(self.namespace.clone()),
Some(group_name),
);
let auto_request: Arc<dyn AutomaticRequest> = Arc::new(request.clone());
let redo_task = NamingRedoTask::new(self.nacos_grpc_client.clone(), auto_request);
let response = self
.request_to_server::<SubscribeServiceRequest, SubscribeServiceResponse>(request)
.await?;
crate::common::error::handle_response(&response, "unsubscribe")?;
debug!("unsubscribe the {response:?}");
self.redo_task_executor
.remove_task(redo_task.task_key().as_str())
.await;
Ok(())
}
}
impl NacosNamingService {
#[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)]
pub(crate) async fn register_instance(
&self,
service_name: String,
group_name: Option<String>,
service_instance: ServiceInstance,
) -> Result<()> {
if service_instance.ephemeral {
self.register_ephemeral_instance_async(service_name, group_name, service_instance)
.await
} else {
self.register_persistent_instance_async(service_name, group_name, service_instance)
.await
}
}
#[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)]
pub(crate) async fn deregister_instance(
&self,
service_name: String,
group_name: Option<String>,
service_instance: ServiceInstance,
) -> Result<()> {
if service_instance.ephemeral {
self.deregister_ephemeral_instance_async(service_name, group_name, service_instance)
.await
} else {
self.deregister_persistent_instance_async(service_name, group_name, service_instance)
.await
}
}
#[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)]
pub(crate) async fn batch_register_instance(
&self,
service_name: String,
group_name: Option<String>,
service_instances: Vec<ServiceInstance>,
) -> Result<()> {
self.batch_register_instance_async(service_name, group_name, service_instances)
.await
}
#[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)]
pub(crate) async fn get_all_instances(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
subscribe: bool,
) -> Result<Vec<ServiceInstance>> {
self.get_all_instances_async(service_name, group_name, clusters, subscribe)
.await
}
#[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)]
pub(crate) async fn select_instances(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
subscribe: bool,
healthy: bool,
) -> Result<Vec<ServiceInstance>> {
self.select_instances_async(service_name, group_name, clusters, subscribe, healthy)
.await
}
#[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)]
pub(crate) async fn select_one_healthy_instance(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
subscribe: bool,
) -> Result<ServiceInstance> {
self.select_one_healthy_instance_async(service_name, group_name, clusters, subscribe)
.await
}
#[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)]
pub(crate) async fn get_service_list(
&self,
page_no: i32,
page_size: i32,
group_name: Option<String>,
) -> Result<(Vec<String>, i32)> {
self.get_service_list_async(page_no, page_size, group_name)
.await
}
#[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)]
pub(crate) async fn subscribe(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
event_listener: Arc<dyn NamingEventListener>,
) -> Result<()> {
let _ = self
.subscribe_async(service_name, group_name, clusters, Some(event_listener))
.await;
Ok(())
}
#[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)]
pub(crate) async fn unsubscribe(
&self,
service_name: String,
group_name: Option<String>,
clusters: Vec<String>,
event_listener: Arc<dyn NamingEventListener>,
) -> Result<()> {
self.unsubscribe_async(service_name, group_name, clusters, Some(event_listener))
.await
}
}
#[cfg(test)]
pub(crate) mod tests {
use core::time;
use std::collections::HashMap;
use tracing::info;
use crate::api::{naming::NamingChangeEvent, plugin::NoopAuthPlugin};
use super::*;
fn tracing_log_try_init() {
crate::test_config::setup_log();
}
#[tokio::test]
#[ignore]
async fn test_ephemeral_register_service() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 9090,
metadata,
..Default::default()
};
let ret = naming_service
.register_instance("test-ephemeral-service".to_string(), None, service_instance)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(1);
tokio::time::sleep(ten_millis).await;
}
#[tokio::test]
#[ignore]
async fn test_persistent_register_service() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 8848,
ephemeral: false,
metadata,
..Default::default()
};
let ret = naming_service
.register_instance(
"test-persistent-service".to_string(),
None,
service_instance,
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(1);
tokio::time::sleep(ten_millis).await;
}
#[tokio::test]
#[ignore]
async fn test_register_and_deregister_persistent_service() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 8848,
metadata,
ephemeral: false,
..Default::default()
};
let ret = naming_service
.register_instance(
"test-persistent-service".to_string(),
None,
service_instance.clone(),
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(3);
tokio::time::sleep(ten_millis).await;
let ret = naming_service
.deregister_instance(
"test-persistent-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
service_instance,
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(1);
tokio::time::sleep(ten_millis).await;
}
#[tokio::test]
#[ignore]
async fn test_register_and_deregister_ephemeral_service() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 9090,
metadata,
..Default::default()
};
let ret = naming_service
.register_instance(
"test-ephemeral-service".to_string(),
None,
service_instance.clone(),
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(3);
tokio::time::sleep(ten_millis).await;
let ret = naming_service
.deregister_instance(
"test-ephemeral-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
service_instance,
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(1);
tokio::time::sleep(ten_millis).await;
}
#[tokio::test]
#[ignore]
async fn test_batch_register_service() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance1 = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 9090,
metadata: metadata.clone(),
..Default::default()
};
let service_instance2 = ServiceInstance {
ip: "192.168.1.1".to_string(),
port: 8888,
metadata: metadata.clone(),
..Default::default()
};
let service_instance3 = ServiceInstance {
ip: "172.0.2.1".to_string(),
port: 6666,
metadata: metadata.clone(),
..Default::default()
};
let instance_vec = vec![service_instance1, service_instance2, service_instance3];
let ret = naming_service
.batch_register_instance(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
instance_vec,
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(3);
tokio::time::sleep(ten_millis).await;
}
#[tokio::test]
#[ignore]
async fn test_batch_register_service_and_query_all_instances() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance1 = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 9090,
metadata: metadata.clone(),
..Default::default()
};
let service_instance2 = ServiceInstance {
ip: "192.168.1.1".to_string(),
port: 8888,
metadata: metadata.clone(),
..Default::default()
};
let service_instance3 = ServiceInstance {
ip: "172.0.2.1".to_string(),
port: 6666,
metadata: metadata.clone(),
..Default::default()
};
let instance_vec = vec![service_instance1, service_instance2, service_instance3];
let ret = naming_service
.batch_register_instance(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
instance_vec,
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(3);
tokio::time::sleep(ten_millis).await;
let all_instances = naming_service
.get_all_instances(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
Vec::default(),
false,
)
.await;
info!("response. {all_instances:?}");
tokio::time::sleep(ten_millis).await;
}
#[tokio::test]
#[ignore]
async fn test_select_instance() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance1 = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 9090,
metadata: metadata.clone(),
..Default::default()
};
let service_instance2 = ServiceInstance {
ip: "192.168.1.1".to_string(),
port: 8888,
metadata: metadata.clone(),
..Default::default()
};
let service_instance3 = ServiceInstance {
ip: "172.0.2.1".to_string(),
port: 6666,
metadata: metadata.clone(),
..Default::default()
};
let instance_vec = vec![service_instance1, service_instance2, service_instance3];
let ret = naming_service
.batch_register_instance(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
instance_vec,
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(3);
tokio::time::sleep(ten_millis).await;
let all_instances = naming_service
.select_instances(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
Vec::default(),
false,
true,
)
.await;
info!("response. {all_instances:?}");
tokio::time::sleep(ten_millis).await;
}
#[tokio::test]
#[ignore]
async fn test_select_one_healthy_instance() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance1 = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 9090,
metadata: metadata.clone(),
..Default::default()
};
let service_instance2 = ServiceInstance {
ip: "192.168.1.1".to_string(),
port: 8888,
metadata: metadata.clone(),
..Default::default()
};
let service_instance3 = ServiceInstance {
ip: "172.0.2.1".to_string(),
port: 6666,
metadata: metadata.clone(),
..Default::default()
};
let instance_vec = vec![service_instance1, service_instance2, service_instance3];
let ret = naming_service
.batch_register_instance(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
instance_vec,
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(3);
tokio::time::sleep(ten_millis).await;
for _ in 0..3 {
let all_instances = naming_service
.select_one_healthy_instance(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
Vec::default(),
false,
)
.await;
info!("response. {all_instances:?}");
}
tokio::time::sleep(ten_millis).await;
}
#[tokio::test]
#[ignore]
async fn test_get_service_list() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance1 = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 9090,
metadata: metadata.clone(),
..Default::default()
};
let service_instance2 = ServiceInstance {
ip: "192.168.1.1".to_string(),
port: 8888,
metadata: metadata.clone(),
..Default::default()
};
let service_instance3 = ServiceInstance {
ip: "172.0.2.1".to_string(),
port: 6666,
metadata: metadata.clone(),
..Default::default()
};
let instance_vec = vec![service_instance1, service_instance2, service_instance3];
let ret = naming_service
.batch_register_instance(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
instance_vec,
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(3);
tokio::time::sleep(ten_millis).await;
let service_list = naming_service.get_service_list(1, 50, None).await;
info!("response. {service_list:?}");
tokio::time::sleep(ten_millis).await;
}
#[derive(Hash, PartialEq)]
pub struct InstancesChangeEventListener;
impl NamingEventListener for InstancesChangeEventListener {
fn event(&self, event: Arc<NamingChangeEvent>) {
info!("InstancesChangeEventListener: {event:?}");
}
}
#[tokio::test]
#[ignore]
async fn test_service_push() {
tracing_log_try_init();
let props = ClientProps::new().server_addr("127.0.0.1:8848");
let mut metadata = HashMap::<String, String>::new();
metadata.insert("netType".to_string(), "external".to_string());
metadata.insert("version".to_string(), "2.0".to_string());
let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))
.await
.expect("Failed to create NacosNamingService in test");
let service_instance1 = ServiceInstance {
ip: "127.0.0.1".to_string(),
port: 9090,
metadata: metadata.clone(),
..Default::default()
};
let service_instance2 = ServiceInstance {
ip: "192.168.1.1".to_string(),
port: 8888,
metadata: metadata.clone(),
..Default::default()
};
let service_instance3 = ServiceInstance {
ip: "172.0.2.1".to_string(),
port: 6666,
metadata: metadata.clone(),
..Default::default()
};
let instance_vec = vec![service_instance1, service_instance2, service_instance3];
let ret = naming_service
.batch_register_instance(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
instance_vec,
)
.await;
info!("response. {ret:?}");
let listener = Arc::new(InstancesChangeEventListener);
let ret = naming_service
.subscribe(
"test-service".to_string(),
Some(crate::api::constants::DEFAULT_GROUP.to_string()),
Vec::default(),
listener,
)
.await;
info!("response. {ret:?}");
let ten_millis = time::Duration::from_secs(1);
tokio::time::sleep(ten_millis).await;
}
}