#![allow(
unused_imports,
unused_assignments,
unused_variables,
unused_mut,
dead_code
)]
use super::api_model::QueryListResult;
use super::cluster::instance_delay_notify::{
ClusterInstanceDelayNotifyActor, InstanceDelayNotifyRequest,
};
use super::cluster::model::{
NamingRouteRequest, ProcessRange, SnapshotForReceive, SnapshotForSend,
};
use super::cluster::node_manage::{InnerNodeManage, NodeManageRequest};
use super::filter::InstanceFilterUtils;
use super::listener::{InnerNamingListener, ListenerItem, NamingListenerCmd};
use super::model::InstanceShortKey;
use super::model::InstanceUpdateTag;
use super::model::ServiceDetailDto;
use super::model::ServiceInfo;
use super::model::ServiceKey;
use super::model::UpdateInstanceType;
use super::model::{DistroData, Instance};
use super::model::{InstanceKey, UpdatePerpetualType};
use super::naming_delay_nofity::DelayNotifyActor;
use super::naming_delay_nofity::DelayNotifyCmd;
use super::naming_subscriber::NamingListenerItem;
use super::naming_subscriber::Subscriber;
use super::service::ServiceInfoDto;
use super::service::ServiceMetadata;
use super::service::{Service, SubscriberInfoDto};
use super::service_index::NamespaceIndex;
use super::service_index::ServiceQueryParam;
use super::NamingUtils;
use crate::common::hash_utils::get_hash_value;
use crate::common::NamingSysConfig;
use crate::common::{delay_notify, AppSysConfig};
use crate::grpc::bistream_manage::BiStreamManage;
use crate::now_millis_i64;
use crate::utils::gz_encode;
use crate::{now_millis, now_second_i32};
use bean_factory::{bean, Inject, InjectComponent};
use chrono::Local;
use inner_mem_cache::TimeoutSet;
use serde::{Deserialize, Serialize};
use std::cmp::{max, Ordering};
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::LinkedList;
use std::default::Default;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use crate::common::constant::EMPTY_ARC_STRING;
use crate::common::constant::NAMING_INSTANCE_TABLE;
use crate::common::model::privilege::NamespacePrivilegeGroup;
use crate::common::pb::data_object::InstanceDo;
use crate::metrics::metrics_key::MetricsKey;
use crate::metrics::model::{MetricsItem, MetricsQuery, MetricsRecord};
use crate::namespace::NamespaceActor;
use crate::naming::instance_meta_manager::{InstanceMetaManager, InstanceMetaManagerReq};
use crate::naming::instance_meta_repository::InstanceMetaDto;
use crate::naming::model::actor_model::{
InstanceRegisterParam, LoadResult, NamingRaftReq, NamingRaftResult, SnapshotBuildRequest,
SnapshotLoadRequest,
};
use crate::naming::sniffing::{NetSniffing, NetSniffingCmd};
use crate::raft::cluster::route::RaftRequestRoute;
use crate::raft::filestore::model::SnapshotRecordDto;
use crate::raft::filestore::raftapply::{RaftApplyDataRequest, RaftApplyDataResponse};
use crate::raft::network::core::RaftRouter;
use crate::raft::store::{ClientRequest, ClientResponse};
use crate::transfer::writer::TransferWriterActor;
use actix::prelude::*;
use quick_protobuf::{BytesReader, Writer};
use regex::Regex;
#[bean(inject)]
pub struct NamingActor {
pub(crate) service_map: HashMap<ServiceKey, Service>,
pub(crate) last_id: u64,
pub(crate) listener_addr: Option<Addr<InnerNamingListener>>,
pub(crate) delay_notify_addr: Option<Addr<DelayNotifyActor>>,
pub(crate) subscriber: Subscriber,
pub(crate) sys_config: NamingSysConfig,
pub(crate) empty_service_set: TimeoutSet<ServiceKey>,
pub(crate) instance_metadate_set: TimeoutSet<InstanceKey>,
pub(crate) namespace_index: NamespaceIndex,
pub(crate) client_instance_set: HashMap<Arc<String>, HashSet<InstanceKey>>,
pub(crate) cluster_node_manage: Option<Addr<InnerNodeManage>>,
pub(crate) cluster_delay_notify: Option<Addr<ClusterInstanceDelayNotifyActor>>,
pub(crate) namespace_actor: Option<Addr<NamespaceActor>>,
pub(crate) current_range: Option<ProcessRange>,
pub(crate) node_id: u64,
pub(crate) disable_notify: bool,
pub(crate) net_sniffing_addr: Option<Addr<NetSniffing>>,
pub(crate) last_perpetual_instance_probe_time: i32,
pub(crate) raft_router: Option<Arc<RaftRequestRoute>>,
pub(crate) meta_manager_addr: Option<Addr<InstanceMetaManager>>,
}
impl Actor for NamingActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
log::info!(" NamingActor started");
}
}
impl Inject for NamingActor {
type Context = Context<Self>;
fn inject(
&mut self,
factory_data: bean_factory::FactoryData,
_factory: bean_factory::BeanFactory,
ctx: &mut Self::Context,
) {
self.listener_addr = factory_data.get_actor();
self.delay_notify_addr = factory_data.get_actor();
if let Some(notify_addr) = self.delay_notify_addr.as_ref() {
self.subscriber.set_notify_addr(notify_addr.clone());
}
self.cluster_node_manage = factory_data.get_actor();
self.cluster_delay_notify = factory_data.get_actor();
self.namespace_actor = factory_data.get_actor();
self.meta_manager_addr = factory_data.get_actor();
self.namespace_index.namespace_actor = self.namespace_actor.clone();
let sys_config: Option<Arc<AppSysConfig>> = factory_data.get_bean();
if let Some(sys_config) = sys_config {
self.sys_config.instance_health_timeout_millis =
sys_config.naming_health_timeout as i64 + 3000;
self.sys_config.instance_timeout_millis =
sys_config.naming_instance_timeout as i64 + 3000;
self.node_id = sys_config.raft_node_id;
log::info!("NamingActor change naming timeout info from env,health_timeout:{},instance_timeout:{}"
,self.sys_config.instance_health_timeout_millis,self.sys_config.instance_timeout_millis);
if sys_config.naming_perpetual_instance_probe_interval > 0 {
self.sys_config.perpetual_instance_probe_interval =
sys_config.naming_perpetual_instance_probe_interval;
self.net_sniffing_addr = factory_data.get_actor();
}
}
self.raft_router = factory_data.get_bean();
self.instance_time_out_heartbeat(ctx);
log::info!("NamingActor inject complete");
}
}
impl Default for NamingActor {
fn default() -> Self {
Self::new()
}
}
impl NamingActor {
pub fn new() -> Self {
let mut subscriber = Subscriber::default();
Self {
service_map: Default::default(),
last_id: 0u64,
listener_addr: None,
subscriber,
delay_notify_addr: None,
sys_config: NamingSysConfig::new(),
empty_service_set: Default::default(),
namespace_index: NamespaceIndex::new(),
instance_metadate_set: Default::default(),
client_instance_set: Default::default(),
cluster_node_manage: None,
cluster_delay_notify: None,
current_range: None,
namespace_actor: None,
node_id: 0,
disable_notify: false,
net_sniffing_addr: None,
last_perpetual_instance_probe_time: 0,
raft_router: None,
meta_manager_addr: None,
}
}
pub fn new_and_create() -> Addr<Self> {
Self::new().start()
}
pub fn create_at_new_system() -> Addr<Self> {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
std::thread::spawn(move || {
let rt = System::new();
let addrs = rt.block_on(async { Self::new().start() });
tx.send(addrs).unwrap();
rt.run().unwrap();
});
rx.recv().unwrap()
}
#[inline]
pub(crate) fn get_service(&mut self, key: &ServiceKey) -> Option<&mut Service> {
self.service_map.get_mut(key)
}
pub(crate) fn create_empty_service(&mut self, key: &ServiceKey) {
match self.get_service(key) {
Some(_) => {}
None => {
let mut service = Service::default();
let current_time = Local::now().timestamp_millis();
service.service_name = key.service_name.clone();
service.namespace_id = key.namespace_id.clone();
service.group_name = key.group_name.clone();
service.group_service = Arc::new(NamingUtils::get_group_and_service_name(
key.service_name.as_ref(),
key.group_name.as_ref(),
));
service.last_modified_millis = current_time;
service.recalculate_checksum();
self.namespace_index.insert_service(key.clone());
self.service_map.insert(key.clone(), service);
self.empty_service_set.add(
now_millis() + self.sys_config.service_time_out_millis,
key.clone(),
);
}
}
}
pub(crate) fn update_service(&mut self, service_info: ServiceDetailDto) {
let key = ServiceKey::new_by_arc(
service_info.namespace_id,
service_info.group_name,
service_info.service_name,
);
match self.get_service(&key) {
Some(service) => {
if let Some(protect_threshold) = service_info.protect_threshold {
service.protect_threshold = protect_threshold;
}
if let Some(metadata) = service_info.metadata {
service.metadata = metadata;
}
}
None => {
let mut service = Service::default();
let current_time = Local::now().timestamp_millis();
service.service_name = key.service_name.clone();
service.namespace_id = key.namespace_id.clone();
service.group_name = key.group_name.clone();
service.group_service = Arc::new(NamingUtils::get_group_and_service_name(
key.service_name.as_ref(),
key.group_name.as_ref(),
));
service.last_modified_millis = current_time;
if let Some(protect_threshold) = service_info.protect_threshold {
service.protect_threshold = protect_threshold;
}
if let Some(metadata) = service_info.metadata {
service.metadata = metadata;
}
service.recalculate_checksum();
self.namespace_index.insert_service(key.clone());
self.service_map.insert(key.clone(), service);
self.empty_service_set.add(
now_millis() + self.sys_config.service_time_out_millis,
key.clone(),
);
}
}
}
fn init_service_metadata(&mut self, key: ServiceKey, records: Vec<InstanceMetaDto>) {
self.create_empty_service(&key);
if let Some(service) = self.service_map.get_mut(&key) {
let now = now_millis();
let timeout = now + self.sys_config.instance_metadata_time_out_millis;
for record in records {
if record.metadata.is_empty() {
continue;
}
self.instance_metadate_set.add(
timeout,
InstanceKey::new_by_service_key(
&key,
record.instance_key.ip.clone(),
record.instance_key.port,
),
);
if let Some(instance) = service.instances.get(&record.instance_key) {
if instance.metadata.is_empty() {
let mut new_instance = instance.as_ref().to_owned();
new_instance.metadata = record.metadata.clone();
service
.instances
.insert(record.instance_key.clone(), Arc::new(new_instance));
}
}
service
.instance_metadata_map
.insert(record.instance_key, record.metadata);
}
}
}
fn get_service_metadata_list(&self) -> Vec<(ServiceKey, Vec<InstanceMetaDto>)> {
let mut result = Vec::new();
for (service_key, service) in &self.service_map {
let records = service.get_service_meta_list();
result.push((service_key.clone(), records));
}
result
}
fn remove_empty_service(&mut self, service_map_key: ServiceKey) -> anyhow::Result<()> {
if let Some(service) = self.service_map.get(&service_map_key) {
if service.instance_size <= 0 {
self.clear_one_empty_service(service_map_key.clone(), 0x7fff_ffff_ffff_ffff);
Ok(())
} else {
Err(anyhow::anyhow!(
"The service has instances,it can't remove!"
))
}
} else {
Ok(())
}
}
fn notify_to_subscriber(&mut self, _tag: &UpdateInstanceType, key: ServiceKey) {
self.subscriber.notify(key);
}
fn do_notify(
&mut self,
tag: &UpdateInstanceType,
key: ServiceKey,
instance: Option<Arc<Instance>>,
) {
#[cfg(feature = "debug")]
if self.disable_notify {
return;
}
match tag {
UpdateInstanceType::New => {
self.subscriber.notify(key);
if let (Some(cluster_delay_notify), Some(instance)) =
(&self.cluster_delay_notify, instance)
{
cluster_delay_notify
.do_send(InstanceDelayNotifyRequest::UpdateInstance(instance));
}
}
UpdateInstanceType::Remove => {
self.subscriber.notify(key);
if let (Some(cluster_delay_notify), Some(instance)) =
(&self.cluster_delay_notify, instance)
{
if instance.from_cluster == 0 {
cluster_delay_notify
.do_send(InstanceDelayNotifyRequest::RemoveInstance(instance));
}
}
}
UpdateInstanceType::UpdateValue => {
self.subscriber.notify(key);
if let (Some(cluster_delay_notify), Some(instance)) =
(&self.cluster_delay_notify, instance)
{
cluster_delay_notify
.do_send(InstanceDelayNotifyRequest::UpdateInstance(instance));
}
}
UpdateInstanceType::UpdateTime => {
if let (Some(cluster_delay_notify), Some(instance)) =
(&self.cluster_delay_notify, instance)
{
if !instance.from_grpc {
cluster_delay_notify
.do_send(InstanceDelayNotifyRequest::UpdateInstanceBeat(instance));
}
}
}
_ => {}
}
}
pub fn remove_instance(
&mut self,
key: &ServiceKey,
instance_id: &InstanceShortKey,
client_id: Option<&Arc<String>>,
) -> (UpdateInstanceType, UpdatePerpetualType) {
let service = if let Some(service) = self.service_map.get_mut(key) {
service
} else {
return (UpdateInstanceType::None, UpdatePerpetualType::None);
};
let mut real_client_id = None;
let mut perpetual_tag = UpdatePerpetualType::None;
let old_instance = service.remove_instance(instance_id, client_id);
let now = now_millis();
let tag = if let Some(old_instance) = &old_instance {
real_client_id = Some(old_instance.client_id.clone());
let short_key = old_instance.get_short_key();
if service.exist_priority_metadata(&short_key) {
let instance_key =
InstanceKey::new_by_service_key(key, short_key.ip, short_key.port);
self.instance_metadate_set.add(
now + self.sys_config.instance_metadata_time_out_millis,
instance_key,
);
}
if !old_instance.ephemeral {
perpetual_tag = UpdatePerpetualType::Remove;
}
UpdateInstanceType::Remove
} else {
UpdateInstanceType::None
};
if service.instance_size <= 0 {
self.empty_service_set
.add(now + self.sys_config.service_time_out_millis, key.clone());
}
let remove_instance = old_instance.filter(|e| !e.is_from_cluster());
self.do_notify(&tag, key.clone(), remove_instance);
if let Some(client_id) = real_client_id {
if !client_id.as_ref().is_empty() {
let instance_key =
InstanceKey::new_by_service_key(key, instance_id.ip.clone(), instance_id.port);
self.remove_client_instance_key(&client_id, &instance_key);
}
}
(tag, perpetual_tag)
}
pub fn update_instance(
&mut self,
key: &ServiceKey,
mut instance: Instance,
tag: Option<InstanceUpdateTag>,
from_sync: bool,
self_addr: Option<Addr<Self>>,
) -> UpdateInstanceType {
instance.init();
self.create_empty_service(key);
let at_process_range = if let Some(range) = &self.current_range {
range.is_range(get_hash_value(&key) as usize)
} else {
false
};
if at_process_range && !instance.from_grpc {
instance.from_cluster = 0;
instance.client_id = EMPTY_ARC_STRING.clone();
}
let service = if let Some(service) = self.service_map.get_mut(key) {
service
} else {
log::warn!("update_instance not found service,{:?}", &key);
return UpdateInstanceType::None;
};
let client_id = instance.client_id.clone();
let instance_key =
InstanceKey::new_by_service_key(key, instance.ip.clone(), instance.port.to_owned());
if (instance.from_grpc || instance.is_from_cluster()) && !instance.client_id.is_empty() {
if let Some(set) = self.client_instance_set.get_mut(&client_id) {
set.insert(instance_key.clone());
} else {
let mut set = HashSet::new();
set.insert(instance_key.clone());
self.client_instance_set.insert(client_id, set);
}
}
let instance_short_key = instance.get_short_key();
let (tag, replace_old_client_id, perpetua_type) =
service.update_instance(instance, tag, from_sync, &self.meta_manager_addr);
#[cfg(feature = "debug")]
log::info!(
"update_instance tag:{:?},key:{:?},replace_old_client_id:{:?}",
&tag,
instance_key,
&replace_old_client_id
);
if !from_sync {
if let Some(self_addr) = self_addr {
match perpetua_type {
UpdatePerpetualType::New | UpdatePerpetualType::Update => {
let instance = service.get_instance(&instance_short_key);
if let Some(instance) = instance.clone() {
self_addr.do_send(NamingCmd::NotifyUpdateRaftInstance(instance));
}
}
UpdatePerpetualType::Remove => {
self_addr
.do_send(NamingCmd::NotifyRemoveRaftInstance(instance_key.clone()));
}
UpdatePerpetualType::None => {}
}
}
}
if let UpdateInstanceType::UpdateOtherClusterMetaData(_, _) = &tag {
return tag;
}
if let Some(replace_old_client_id) = replace_old_client_id {
if let Some(set) = self.client_instance_set.get_mut(&replace_old_client_id) {
set.remove(&instance_key);
}
}
if !from_sync {
let instance = service.get_instance(&instance_short_key);
self.do_notify(&tag, key.clone(), instance);
} else {
self.notify_to_subscriber(&tag, key.clone());
}
tag
}
pub(crate) fn remove_client_instance(&mut self, client_id: &Arc<String>) {
if let Some(keys) = self.client_instance_set.remove(client_id) {
for instance_key in keys {
let service_key = instance_key.get_service_key();
let short_key = instance_key.get_short_key();
self.remove_instance(&service_key, &short_key, Some(client_id));
}
}
}
fn remove_client_instance_key(&mut self, client_id: &Arc<String>, key: &InstanceKey) {
if let Some(keys) = self.client_instance_set.get_mut(client_id) {
keys.remove(key);
}
}
pub fn get_instance(
&self,
key: &ServiceKey,
instance_id: &InstanceShortKey,
) -> Option<Arc<Instance>> {
if let Some(service) = self.service_map.get(key) {
service.get_instance(instance_id)
} else {
None
}
}
pub fn get_instance_list(
&self,
key: &ServiceKey,
cluster_str: &str,
only_healthy: bool,
) -> Vec<Arc<Instance>> {
let cluster_names = NamingUtils::split_filters(cluster_str);
if let Some(service) = self.service_map.get(key) {
return InstanceFilterUtils::default_instance_filter(
service.get_instance_list(cluster_names, false, true),
Some(service.get_metadata()),
only_healthy,
);
}
vec![]
}
pub fn get_instance_page(
&self,
key: &ServiceKey,
cluster_str: &str,
only_healthy: bool,
page_size: usize,
page_index: usize,
) -> (usize, Vec<Arc<Instance>>) {
let mut all_instances = self.get_instance_list(key, cluster_str, only_healthy);
let total = all_instances.len();
let offset = if page_index == 0 {
0
} else {
page_size * (page_index - 1)
};
all_instances.sort_by(|a, b| a.get_short_key().cmp(&b.get_short_key()));
let page_data = all_instances
.into_iter()
.skip(offset)
.take(page_size)
.collect::<Vec<_>>();
(total, page_data)
}
pub fn get_instances_and_metadata(
&self,
key: &ServiceKey,
cluster_str: &str,
only_healthy: bool,
) -> (Vec<Arc<Instance>>, Option<ServiceMetadata>) {
let cluster_names = NamingUtils::split_filters(cluster_str);
if let Some(service) = self.service_map.get(key) {
return (
service.get_instance_list(cluster_names, only_healthy, true),
Some(service.get_metadata()),
);
}
(vec![], None)
}
pub fn get_metadata(&self, key: &ServiceKey) -> Option<ServiceMetadata> {
self.service_map.get(key).map(|e| e.get_metadata())
}
pub fn get_instance_map(
&self,
key: &ServiceKey,
cluster_names: Vec<String>,
only_healthy: bool,
) -> HashMap<String, Vec<Arc<Instance>>> {
let mut map: HashMap<String, Vec<Arc<Instance>>> = HashMap::new();
if let Some(service) = self.service_map.get(key) {
for item in service.get_instance_list(cluster_names, only_healthy, true) {
if let Some(list) = map.get_mut(&item.cluster_name) {
list.push(item)
} else {
map.insert(item.cluster_name.to_owned(), vec![item]);
}
}
}
map
}
pub(crate) fn get_service_info(
&self,
key: &ServiceKey,
cluster_str: String,
only_healthy: bool,
) -> ServiceInfo {
let (hosts, metadata) = self.get_instances_and_metadata(key, &cluster_str, false);
let service_info = ServiceInfo {
name: Some(key.service_name.clone()),
group_name: Some(key.group_name.clone()),
cache_millis: 10000i64,
last_ref_time: now_millis_i64(),
reach_protection_threshold: false,
hosts: Some(hosts),
clusters: Some(cluster_str),
..Default::default()
};
InstanceFilterUtils::default_service_filter(service_info, metadata, only_healthy)
}
pub fn get_instance_list_string(
&self,
key: &ServiceKey,
cluster_str: String,
only_healthy: bool,
) -> String {
let list = self.get_instance_list(key, &cluster_str, only_healthy);
QueryListResult::get_instance_list_string(cluster_str, key, list)
}
pub fn time_check(&mut self) {
let current_time = Local::now().timestamp_millis();
let healthy_time = current_time - self.sys_config.instance_health_timeout_millis;
let offline_time = current_time - self.sys_config.instance_timeout_millis;
let mut size = 0;
let now = now_millis();
let mut change_list = vec![];
for item in self.service_map.values_mut() {
let service_key = item.get_service_key();
let (rlist, ulist) = item.time_check(healthy_time, offline_time);
size += rlist.len() + ulist.len();
if !rlist.is_empty() {
for short_key in &rlist {
if item.exist_priority_metadata(short_key) {
let instance_key = InstanceKey::new_by_service_key(
&service_key,
short_key.ip.clone(),
short_key.port,
);
self.instance_metadate_set.add(
now + self.sys_config.instance_metadata_time_out_millis,
instance_key,
);
}
}
}
if item.instance_size <= 0 {
self.empty_service_set.add(
now + self.sys_config.service_time_out_millis,
item.get_service_key(),
);
}
change_list.push((service_key, rlist, ulist));
if size >= self.sys_config.once_time_check_size {
break;
}
}
for (service_key, rlist, ulist) in change_list {
self.time_check_notify(service_key, rlist, ulist);
}
}
fn trigger_perpetual_health_check(&mut self) {
let now = now_second_i32();
if now - self.last_perpetual_instance_probe_time
< self.sys_config.perpetual_instance_probe_interval
{
return;
}
let sniffing_addr = if let Some(v) = &self.net_sniffing_addr {
v.clone()
} else {
return;
};
self.last_perpetual_instance_probe_time = now;
let mut host_servers: HashMap<InstanceShortKey, Vec<ServiceKey>> = HashMap::new();
for service in self.service_map.values() {
for key in &service.perpetual_host_set {
let service_key = service.get_service_key();
if let Some(list) = host_servers.get_mut(key) {
list.push(service_key);
} else {
host_servers.insert(key.clone(), vec![service_key]);
}
}
}
for (key, value) in host_servers {
sniffing_addr.do_send(NetSniffingCmd::ProbeServiceHost(key, value));
}
}
fn update_perpetual_health(
&mut self,
host: InstanceShortKey,
service_keys: Vec<ServiceKey>,
sniffing_result: bool,
) {
#[cfg(feature = "debug")]
log::info!(
"update_perpetual_health,host:{:?},keys:{:?},result:{}",
&host,
&service_keys,
sniffing_result
);
for service_key in service_keys {
if let Some(server) = self.service_map.get_mut(&service_key) {
if sniffing_result {
server.update_perpetual_instance_healthy_valid(&host);
} else {
server.update_instance_healthy_invalid(&host);
}
}
}
}
fn time_check_notify(
&mut self,
key: ServiceKey,
remove_list: Vec<InstanceShortKey>,
update_list: Vec<InstanceShortKey>,
) {
if !remove_list.is_empty() {
self.time_check_sync_remove_info_to_cluster(key.clone(), remove_list);
self.do_notify(&UpdateInstanceType::Remove, key.clone(), None);
}
if !update_list.is_empty() {
self.time_check_sync_update_info_to_cluster(key.clone(), update_list);
self.do_notify(&UpdateInstanceType::UpdateValue, key, None);
}
}
fn time_check_sync_update_info_to_cluster(
&self,
key: ServiceKey,
update_list: Vec<InstanceShortKey>,
) {
if let (Some(cluster_delay_notify), Some(service)) =
(&self.cluster_delay_notify, self.service_map.get(&key))
{
if service.instances.is_empty() {
return;
}
for instance_key in update_list {
if let Some(instance) = service.get_instance(&instance_key) {
if instance.is_from_cluster() {
continue;
}
cluster_delay_notify
.do_send(InstanceDelayNotifyRequest::UpdateInstance(instance));
}
}
}
}
fn time_check_sync_remove_info_to_cluster(
&self,
key: ServiceKey,
remove_list: Vec<InstanceShortKey>,
) {
if let Some(cluster_delay_notify) = &self.cluster_delay_notify {
for instance_key in remove_list {
let instance = Arc::new(Instance {
namespace_id: key.namespace_id.clone(),
group_name: key.group_name.clone(),
service_name: key.service_name.clone(),
ip: instance_key.ip,
port: instance_key.port,
..Default::default()
});
cluster_delay_notify.do_send(InstanceDelayNotifyRequest::RemoveInstance(instance));
}
}
}
pub fn get_service_list(
&self,
page_size: usize,
page_index: usize,
key: &ServiceKey,
) -> (usize, Vec<Arc<String>>) {
let offset = if page_index == 0 {
0
} else {
page_size * (page_index - 1)
};
let param = ServiceQueryParam {
offset,
limit: page_size,
namespace_id: Some(key.namespace_id.clone()),
group: Some(key.group_name.clone()),
..Default::default()
};
let (size, list) = self.namespace_index.query_service_page(¶m);
let service_names = list.into_iter().map(|e| e.service_name).collect::<Vec<_>>();
(size, service_names)
}
#[deprecated]
pub fn get_subscribers_list(
&self,
page_size: usize,
page_index: usize,
key: &ServiceKey,
) -> (usize, Vec<Arc<SubscriberInfoDto>>) {
let mut ret = Vec::new();
let res = self.subscriber.fuzzy_match_listener(
&key.group_name,
&key.service_name,
&key.namespace_id,
);
for (service_key, val) in res {
for (ip_port, _) in val {
let (ip, port) = Subscriber::parse_ip_port(ip_port.as_ref());
let subscriber_info = SubscriberInfoDto {
service_name: service_key.service_name.clone(),
group_name: service_key.group_name.clone(),
namespace_id: service_key.namespace_id.clone(),
ip,
port,
};
ret.push(Arc::new(subscriber_info));
}
}
let total = ret.len();
let start = (page_index - 1) * page_size;
ret.sort_by(|a, b| {
a.service_name
.cmp(&b.service_name)
.then(a.group_name.cmp(&b.group_name))
.then(a.ip.cmp(&b.ip))
.then(a.port.cmp(&b.port))
});
let paginated_result = ret
.into_iter()
.skip(start)
.take(page_size)
.collect::<Vec<_>>();
(total, paginated_result)
}
pub fn get_subscribers_list_v2(
&self,
page_size: usize,
page_index: usize,
key: &ServiceKey,
) -> (usize, Vec<SubscriberInfoDto>) {
let offset = if page_index == 0 {
0
} else {
page_size * (page_index - 1)
};
let param = ServiceQueryParam {
offset,
limit: page_size,
namespace_id: Some(key.namespace_id.clone()),
like_group: Some(key.group_name.as_ref().clone()),
like_service: Some(key.service_name.as_ref().clone()),
..Default::default()
};
self.subscriber.query_service_listener_page(¶m)
}
pub fn get_subscribers_list_by_param(
&self,
param: ServiceQueryParam,
) -> (usize, Vec<SubscriberInfoDto>) {
self.subscriber.query_service_listener_page(¶m)
}
pub fn get_service_info_page(&self, param: ServiceQueryParam) -> (usize, Vec<ServiceInfoDto>) {
let (size, list) = self.namespace_index.query_service_page(¶m);
if size == 0 {
return (0, Vec::new());
}
let mut info_list = Vec::with_capacity(list.len());
for item in &list {
if let Some(service) = self.service_map.get(item) {
info_list.push(service.get_service_info());
}
}
(size, info_list)
}
fn update_listener(
&mut self,
key: &ServiceKey,
cluster_names: &[String],
addr: SocketAddr,
only_healthy: bool,
) {
if let Some(listener_addr) = self.listener_addr.as_ref() {
let item = ListenerItem::new(cluster_names.to_owned(), only_healthy, addr);
let msg = NamingListenerCmd::Add(key.clone(), item);
listener_addr.do_send(msg);
}
}
fn clear_empty_service(&mut self) {
let now = now_millis();
for service_map_key in self.empty_service_set.timeout(now) {
self.clear_one_empty_service(service_map_key, now)
}
}
fn clear_one_empty_service(&mut self, service_map_key: ServiceKey, now: u64) {
if let Some(service) = self.service_map.get(&service_map_key) {
if service.instance_size <= 0
&& now - self.sys_config.service_time_out_millis >= service.last_empty_times
{
self.namespace_index
.remove_service(&service.get_service_key());
self.service_map.remove(&service_map_key);
log::info!("clear_empty_service:{:?}", &service_map_key);
}
}
}
fn clear_timeout_instance_metadata(&mut self) {
let meta_manager_addr = self.meta_manager_addr.clone();
for instance_key in self.instance_metadate_set.timeout(now_millis()) {
self.clear_one_timeout_instance_metadata(instance_key, &meta_manager_addr);
}
}
fn clear_one_timeout_instance_metadata(
&mut self,
instance_key: InstanceKey,
meta_manager_addr: &Option<Addr<InstanceMetaManager>>,
) {
let service_key = instance_key.get_service_key();
if let Some(service) = self.service_map.get_mut(&service_key) {
let short_key = instance_key.get_short_key();
if !service.instances.contains_key(&short_key) {
service.instance_metadata_map.remove(&short_key);
if let Some(meta_manager_addr) = meta_manager_addr {
meta_manager_addr.do_send(InstanceMetaManagerReq::RemoveServiceMeta {
service_keys: vec![service_key],
});
}
}
}
}
pub fn instance_time_out_heartbeat(&self, ctx: &mut actix::Context<Self>) {
ctx.run_later(Duration::from_millis(2000), |act, ctx| {
act.clear_empty_service();
act.clear_timeout_instance_metadata();
act.trigger_perpetual_health_check();
let addr = ctx.address();
addr.do_send(NamingCmd::PeekListenerTimeout);
act.instance_time_out_heartbeat(ctx);
});
}
pub fn build_snapshot_data(&self, ranges: Vec<ProcessRange>) -> SnapshotForSend {
let mut service_details = vec![];
let mut instances = vec![];
for (service_key, service) in &self.service_map {
let hash_value = get_hash_value(service_key) as usize;
if !ProcessRange::is_range_at_list(hash_value, &ranges) {
continue;
}
service_details.push(service.get_service_detail());
instances.append(&mut service.get_owner_http_instances());
}
for keys in self.client_instance_set.values() {
for instance_key in keys {
let service_key = instance_key.get_service_key();
let short_key = instance_key.get_short_key();
if let Some(v) = self.get_instance(&service_key, &short_key) {
if !v.is_from_cluster() {
instances.push(v)
}
}
}
}
SnapshotForSend {
route_index: 0,
node_count: 0,
services: service_details,
instances,
mode: 0,
}
}
fn query_grpc_distro_data(&self) -> HashMap<Arc<String>, HashSet<InstanceKey>> {
let mut client_data: HashMap<Arc<String>, HashSet<InstanceKey>> = HashMap::new();
let client_id_pre = format!("{}_", &self.node_id);
for (client_id, keys) in &self.client_instance_set {
if !client_id.as_ref().starts_with(&client_id_pre) {
continue;
}
client_data.insert(client_id.clone(), keys.clone());
}
client_data
}
fn diff_grpc_distro_data(
&self,
cluster_id: u64,
data: HashMap<ServiceKey, u64>,
) -> HashMap<ServiceKey, i64> {
let mut result = HashMap::new();
for (service_key, count) in data.iter() {
let mut i = *count as i64;
if let Some(v) = self.service_map.get(service_key) {
for item in v.instances.values() {
if item.from_cluster == cluster_id {
i -= 1;
}
}
}
if i != 0 {
log::info!("diff_grpc_distro_data:{:?},{}", service_key, i);
result.insert(service_key.clone(), i);
}
}
result
}
fn diff_grpc_distro_client_data(
&mut self,
cluster_id: u64,
data: HashMap<Arc<String>, HashSet<InstanceKey>>,
) -> Vec<InstanceKey> {
let mut remove_keys = vec![];
let mut new_items: Vec<InstanceKey> = vec![];
for (client_id, instances) in data {
if let Some(v) = self.client_instance_set.get(&client_id) {
for item in v.difference(&instances) {
remove_keys.push((item.get_service_key(), item.get_short_key()));
}
for item in instances.difference(v) {
new_items.push(item.clone());
}
} else {
for item in instances {
new_items.push(item);
}
}
}
for (service_key, client_key) in remove_keys {
self.remove_instance(&service_key, &client_key, None);
}
new_items
}
fn build_distro_instances(&self, instance_keys: Vec<InstanceKey>) -> Vec<Arc<Instance>> {
let mut instances = vec![];
for instance_key in instance_keys {
let service_key = instance_key.get_service_key();
let short_key = instance_key.get_short_key();
if let Some(v) = self.get_instance(&service_key, &short_key) {
if !v.is_from_cluster() {
instances.push(v)
}
}
}
instances
}
fn refresh_process_range(&mut self, range: ProcessRange) -> anyhow::Result<()> {
for (service_key, service) in &mut self.service_map {
let hash_value = get_hash_value(service_key) as usize;
if !range.is_range(hash_value) {
continue;
}
service.do_refresh_process_range();
}
self.current_range = Some(range);
Ok(())
}
fn receive_snapshot(&mut self, snapshot: SnapshotForReceive) {
for service_detail in snapshot.services {
self.update_service(service_detail);
}
for mut instance in snapshot.instances {
self.update_instance(&instance.get_service_key(), instance, None, true, None);
}
}
fn notify_cluster_remove_client_id(&mut self, client_id: Arc<String>) {
if let Some(node_manage) = self.cluster_node_manage.as_ref() {
let req = NamingRouteRequest::RemoveClientId {
client_id: client_id.clone(),
};
node_manage.do_send(NodeManageRequest::SendToOtherNodes(req));
node_manage.do_send(NodeManageRequest::RemoveClientId(client_id));
}
}
pub(crate) fn get_instance_size(&self) -> usize {
let mut sum = 0;
for service in self.service_map.values() {
sum += service.instances.len();
}
sum
}
pub(crate) fn get_client_instance_set_item_size(&self) -> usize {
let mut sum = 0;
for set in self.client_instance_set.values() {
sum += set.len();
}
sum
}
pub(crate) fn get_healthy_timeout_set_size(&self) -> usize {
let mut sum = 0;
for service in self.service_map.values() {
sum += service.healthy_timeout_set.len();
}
sum
}
pub(crate) fn get_healthy_timeout_set_item_size(&self) -> usize {
let mut sum = 0;
for service in self.service_map.values() {
sum += service.get_healthy_timeout_set_item_size();
}
sum
}
pub(crate) fn get_unhealthy_timeout_set_size(&self) -> usize {
let mut sum = 0;
for service in self.service_map.values() {
sum += service.unhealthy_timeout_set.len();
}
sum
}
pub(crate) fn get_unhealthy_timeout_set_item_size(&self) -> usize {
let mut sum = 0;
for service in self.service_map.values() {
sum += service.get_unhealthy_timeout_set_item_size();
}
sum
}
pub fn process_naming_raft_request(
&mut self,
req: NamingRaftReq,
) -> anyhow::Result<NamingRaftResult> {
match req {
NamingRaftReq::RegisterInstance { param } => {
if !param.ephemeral {
let instance: Instance = param.into();
let service_key = instance.get_service_key();
let short_key = instance.get_short_key();
self.update_instance(&service_key, instance, None, true, None);
if let Some(instance) = self.get_instance(&service_key, &short_key) {
Ok(NamingRaftResult::InstanceInfo(instance))
} else {
Ok(NamingRaftResult::None)
}
} else {
Ok(NamingRaftResult::None)
}
}
NamingRaftReq::UpdateInstance { param } => {
if !param.ephemeral {
let instance: Instance = param.into();
let service_key = instance.get_service_key();
self.update_instance(&service_key, instance, None, true, None);
Ok(NamingRaftResult::None)
} else {
Ok(NamingRaftResult::None)
}
}
NamingRaftReq::RemoveInstance(instance_key) => {
let service_key = instance_key.get_service_key();
let instance_short_key = instance_key.get_short_key();
self.remove_instance(&service_key, &instance_short_key, None);
Ok(NamingRaftResult::None)
}
}
}
fn build_snapshot(
&self,
writer: Addr<crate::raft::filestore::raftsnapshot::SnapshotWriterActor>,
) -> anyhow::Result<()> {
for service in self.service_map.values() {
for key in &service.perpetual_host_set {
if let Some(instance) = service.instances.get(key) {
if !instance.ephemeral {
let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let instance_do = instance.to_do();
writer.write_message(&instance_do)?;
}
let record = SnapshotRecordDto {
tree: NAMING_INSTANCE_TABLE.clone(),
key: Vec::new(),
value: buf,
op_type: 0,
};
writer.do_send(
crate::raft::filestore::raftsnapshot::SnapshotWriterRequest::Record(
record,
),
);
}
}
}
}
Ok(())
}
fn load_snapshot_record(&mut self, record: SnapshotRecordDto) -> anyhow::Result<()> {
if record.tree.as_str() == NAMING_INSTANCE_TABLE.as_str() {
let mut reader = BytesReader::from_bytes(&record.value);
let instance_do: InstanceDo = reader.read_message(&record.value)?;
let instance = Instance::from_do(instance_do);
let service_key = instance.get_service_key();
self.update_instance(&service_key, instance, None, true, None);
}
Ok(())
}
fn load_completed(&mut self) -> anyhow::Result<()> {
Ok(())
}
fn update_instance_to_raft(&mut self, instance: Arc<Instance>, ctx: &mut Context<Self>) {
let raft_router = self.raft_router.clone();
let req = NamingRaftReq::UpdateInstance {
param: instance.as_ref().into(),
};
Self::raft_request_with_retry(req, raft_router, 2)
.into_actor(self)
.map(|res, _, _| {})
.spawn(ctx);
}
fn remove_instance_to_raft(&mut self, instance_key: InstanceKey, ctx: &mut Context<Self>) {
let raft_router = self.raft_router.clone();
let req = NamingRaftReq::RemoveInstance(instance_key);
Self::raft_request_with_retry(req, raft_router, 2)
.into_actor(self)
.map(|res, _, _| {})
.spawn(ctx);
}
pub(crate) async fn raft_request_with_retry(
req: NamingRaftReq,
raft_router: Option<Arc<RaftRequestRoute>>,
retry: i32,
) -> anyhow::Result<NamingRaftResult> {
if raft_router.is_none() {
return Err(anyhow::anyhow!("raft_router is None"));
}
let mut i = 0;
while i < retry {
let res = Self::raft_request(req.clone(), raft_router.clone()).await;
if res.is_ok() {
return res;
}
i += 1;
tokio::time::sleep(Duration::from_secs(3)).await;
}
Self::raft_request(req, raft_router).await
}
pub(crate) async fn raft_request(
req: NamingRaftReq,
raft_router: Option<Arc<RaftRequestRoute>>,
) -> anyhow::Result<NamingRaftResult> {
let raft_router = if let Some(r) = raft_router {
r
} else {
return Err(anyhow::anyhow!("raft_router is None"));
};
let res = raft_router
.request(ClientRequest::NamingReq { req })
.await?;
if let ClientResponse::NamingResp { resp } = res {
Ok(resp)
} else {
Err(anyhow::anyhow!("raft_request error"))
}
}
pub(crate) fn transfer_backup(&self, writer: Addr<TransferWriterActor>) -> anyhow::Result<()> {
use crate::common::constant::NAMING_INSTANCE_TABLE;
use crate::transfer::model::{TransferRecordDto, TransferWriterRequest};
for (service_key, service_info) in &self.service_map {
for host in service_info.perpetual_host_set.iter() {
if let Some(instance) = service_info.instances.get(host) {
if instance.ephemeral {
continue;
}
let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let instance_do = instance.to_do();
writer.write_message(&instance_do)?;
}
let record = TransferRecordDto {
table_name: Some(NAMING_INSTANCE_TABLE.clone()),
key: Vec::new(),
value: buf,
table_id: 0,
};
writer.do_send(TransferWriterRequest::AddRecord(record));
}
}
}
Ok(())
}
}
#[derive(Debug, Message)]
#[rtype(result = "anyhow::Result<NamingResult>")]
pub enum NamingCmd {
Update(Instance, Option<InstanceUpdateTag>),
UpdateFromSync(Instance, Option<InstanceUpdateTag>),
UpdateBatch(Vec<Instance>),
Delete(Instance),
DeleteBatch(Vec<Instance>),
Query(Instance),
QueryList(ServiceKey, String, bool, Option<SocketAddr>),
QueryInstancePage {
service_key: ServiceKey,
cluster: String,
only_healthy: bool,
page_size: usize,
page_index: usize,
},
SelectOneInstance(ServiceKey),
QueryAllInstanceList(ServiceKey),
QueryListString(ServiceKey, String, bool, Option<SocketAddr>),
QueryServiceInfo(ServiceKey, String, bool),
QueryServicePage(ServiceKey, usize, usize),
QueryServiceSubscribersPage(ServiceKey, usize, usize),
QueryServiceSubscribersPageV2(ServiceQueryParam),
QueryServiceInfoPage(ServiceQueryParam),
QueryServiceOnly(ServiceKey),
UpdateService(ServiceDetailDto),
UpdateServiceFromCluster(ServiceDetailDto),
RemoveService(ServiceKey),
PeekListenerTimeout,
NotifyListener(ServiceKey, u64),
Subscribe(Vec<NamingListenerItem>, Arc<String>),
RemoveSubscribe(Vec<NamingListenerItem>, Arc<String>),
RemoveClient(Arc<String>),
RemoveClientsFromCluster(Vec<Arc<String>>),
RemoveClientFromCluster(Arc<String>),
QueryClientInstanceCount,
QueryDalAddr,
QuerySnapshot(Vec<ProcessRange>),
ClusterRefreshProcessRange(ProcessRange),
ReceiveSnapshot(SnapshotForReceive),
QueryGrpcDistroData,
DiffGrpcDistroData {
cluster_id: u64,
data: DistroData,
},
QueryDistroInstanceSnapshot(Vec<InstanceKey>),
PerpetualHostSniffing {
host: InstanceShortKey,
service_keys: Vec<ServiceKey>,
success: bool,
},
NotifyUpdateRaftInstance(Arc<Instance>),
NotifyRemoveRaftInstance(InstanceKey),
InitInstanceMeta(ServiceKey, Vec<InstanceMetaDto>),
QueryAllServiceInstanceMetaData,
}
pub enum NamingResult {
NULL,
Instance(Arc<Instance>),
SelectInstance(Option<Arc<Instance>>),
InstanceList(Vec<Arc<Instance>>),
InstanceListString(String),
ServiceInfo(ServiceInfo),
ServicePage((usize, Vec<Arc<String>>)),
ServiceSubscribersPage((usize, Vec<SubscriberInfoDto>)),
ServiceInfoPage((usize, Vec<ServiceInfoDto>)),
InstanceInfoPage((usize, Vec<Arc<Instance>>)),
ServiceDto(Option<ServiceInfoDto>),
ClientInstanceCount(Vec<(Arc<String>, usize)>),
RewriteToCluster(u64, Instance),
Snapshot(SnapshotForSend),
GrpcDistroData(DistroData),
DiffDistroData(DistroData),
DistroInstancesSnapshot(Vec<Arc<Instance>>),
AllServiceInstanceMetaData(Vec<(ServiceKey, Vec<InstanceMetaDto>)>),
}
impl Supervised for NamingActor {
fn restarting(&mut self, _ctx: &mut <Self as Actor>::Context) {
log::warn!("NamingActor restart ...");
}
}
impl Handler<NamingCmd> for NamingActor {
type Result = anyhow::Result<NamingResult>;
fn handle(&mut self, msg: NamingCmd, ctx: &mut Context<Self>) -> Self::Result {
match msg {
NamingCmd::Update(instance, tag) => {
let tag = self.update_instance(
&instance.get_service_key(),
instance,
tag,
false,
Some(ctx.address()),
);
if let UpdateInstanceType::UpdateOtherClusterMetaData(node_id, instance) = tag {
Ok(NamingResult::RewriteToCluster(node_id, instance))
} else {
Ok(NamingResult::NULL)
}
}
NamingCmd::UpdateFromSync(instance, tag) => {
let tag =
self.update_instance(&instance.get_service_key(), instance, tag, true, None);
if let UpdateInstanceType::UpdateOtherClusterMetaData(node_id, instance) = tag {
Ok(NamingResult::RewriteToCluster(node_id, instance))
} else {
Ok(NamingResult::NULL)
}
}
NamingCmd::UpdateBatch(instances) => {
for mut instance in instances {
self.update_instance(&instance.get_service_key(), instance, None, true, None);
}
Ok(NamingResult::NULL)
}
NamingCmd::Delete(instance) => {
let (_, perpetual_tag) = self.remove_instance(
&instance.get_service_key(),
&instance.get_short_key(),
Some(&instance.client_id),
);
if let UpdatePerpetualType::Remove = perpetual_tag {
let instance_key = instance.get_instance_key();
self.remove_instance_to_raft(instance_key, ctx);
}
Ok(NamingResult::NULL)
}
NamingCmd::DeleteBatch(instances) => {
for instance in instances {
self.remove_instance(
&instance.get_service_key(),
&instance.get_short_key(),
Some(&instance.client_id),
);
}
Ok(NamingResult::NULL)
}
NamingCmd::Query(instance) => {
if let Some(i) =
self.get_instance(&instance.get_service_key(), &instance.get_short_key())
{
return Ok(NamingResult::Instance(i));
}
Ok(NamingResult::NULL)
}
NamingCmd::QueryList(service_key, cluster_str, only_healthy, addr) => {
let cluster_names = NamingUtils::split_filters(&cluster_str);
if let Some(addr) = addr {
self.update_listener(&service_key, &cluster_names, addr, only_healthy);
}
let list = self.get_instance_list(&service_key, &cluster_str, only_healthy);
Ok(NamingResult::InstanceList(list))
}
NamingCmd::QueryInstancePage {
service_key,
cluster,
only_healthy,
page_size,
page_index,
} => {
let cluster_names = NamingUtils::split_filters(&cluster);
Ok(NamingResult::InstanceInfoPage(self.get_instance_page(
&service_key,
&cluster,
only_healthy,
page_size,
page_index,
)))
}
NamingCmd::QueryListString(service_key, cluster_str, only_healthy, addr) => {
let cluster_names = NamingUtils::split_filters(&cluster_str);
if let Some(addr) = addr {
self.update_listener(&service_key, &cluster_names, addr, only_healthy);
}
let data = self.get_instance_list_string(&service_key, cluster_str, only_healthy);
Ok(NamingResult::InstanceListString(data))
}
NamingCmd::QueryServiceInfo(service_key, cluster_str, only_healthy) => {
let cluster_names = NamingUtils::split_filters(&cluster_str);
let service_info = self.get_service_info(&service_key, cluster_str, only_healthy);
Ok(NamingResult::ServiceInfo(service_info))
}
NamingCmd::QueryServicePage(service_key, page_size, page_index) => {
Ok(NamingResult::ServicePage(self.get_service_list(
page_size,
page_index,
&service_key,
)))
}
NamingCmd::QueryServiceSubscribersPage(service_key, page_size, page_index) => {
Ok(NamingResult::ServiceSubscribersPage(
self.get_subscribers_list_v2(page_size, page_index, &service_key),
))
}
NamingCmd::QueryServiceInfoPage(param) => Ok(NamingResult::ServiceInfoPage(
self.get_service_info_page(param),
)),
NamingCmd::QueryServiceOnly(service_key) => Ok(NamingResult::ServiceDto(
self.get_service(&service_key).map(|s| s.get_service_info()),
)),
NamingCmd::QueryServiceSubscribersPageV2(param) => Ok(
NamingResult::ServiceSubscribersPage(self.get_subscribers_list_by_param(param)),
),
NamingCmd::PeekListenerTimeout => {
self.time_check();
Ok(NamingResult::NULL)
}
NamingCmd::NotifyListener(service_key, id) => {
if let Some(listener_addr) = self.listener_addr.as_ref() {
let map = self.get_instance_map(&service_key, vec![], false);
let msg = NamingListenerCmd::Notify(service_key, "".to_string(), map, id);
listener_addr.do_send(msg);
}
Ok(NamingResult::NULL)
}
NamingCmd::Subscribe(items, client_id) => {
self.subscriber.add_subscribe(client_id, items.clone());
for item in items {
self.subscriber.notify(item.service_key);
}
Ok(NamingResult::NULL)
}
NamingCmd::RemoveSubscribe(items, client_id) => {
self.subscriber.remove_subscribe(client_id, items);
Ok(NamingResult::NULL)
}
NamingCmd::RemoveClient(client_id) => {
self.subscriber.remove_client_subscribe(client_id.clone());
self.remove_client_instance(&client_id);
self.notify_cluster_remove_client_id(client_id);
Ok(NamingResult::NULL)
}
NamingCmd::RemoveClientsFromCluster(client_ids) => {
for client_id in client_ids {
self.subscriber.remove_client_subscribe(client_id.clone());
self.remove_client_instance(&client_id);
}
Ok(NamingResult::NULL)
}
NamingCmd::RemoveClientFromCluster(client_id) => {
self.subscriber.remove_client_subscribe(client_id.clone());
self.remove_client_instance(&client_id);
Ok(NamingResult::NULL)
}
NamingCmd::QueryDalAddr => {
Ok(NamingResult::NULL)
}
NamingCmd::UpdateServiceFromCluster(service_info) => {
self.update_service(service_info);
Ok(NamingResult::NULL)
}
NamingCmd::UpdateService(service_info) => {
self.update_service(service_info.clone());
if let Some(node_manage) = self.cluster_node_manage.as_ref() {
node_manage.do_send(NodeManageRequest::SendToOtherNodes(
NamingRouteRequest::SyncUpdateService {
service: service_info,
},
));
}
Ok(NamingResult::NULL)
}
NamingCmd::RemoveService(service_key) => {
self.remove_empty_service(service_key)?;
Ok(NamingResult::NULL)
}
NamingCmd::QueryAllInstanceList(key) => {
if let Some(service) = self.service_map.get(&key) {
Ok(NamingResult::InstanceList(service.get_instance_list(
vec![],
false,
false,
)))
} else {
Ok(NamingResult::InstanceList(vec![]))
}
}
NamingCmd::SelectOneInstance(service_key) => {
let v = if let Some(service) = self.service_map.get(&service_key) {
service.select_one_instance(true, true)
} else {
None
};
Ok(NamingResult::SelectInstance(v))
}
NamingCmd::QueryClientInstanceCount => {
let mut client_instance_count = Vec::with_capacity(self.client_instance_set.len());
for (k, v) in &self.client_instance_set {
client_instance_count.push((k.clone(), v.len()));
}
Ok(NamingResult::ClientInstanceCount(client_instance_count))
}
NamingCmd::QuerySnapshot(ranges) => {
let res = self.build_snapshot_data(ranges);
Ok(NamingResult::Snapshot(res))
}
NamingCmd::ClusterRefreshProcessRange(range) => {
self.refresh_process_range(range)?;
Ok(NamingResult::NULL)
}
NamingCmd::ReceiveSnapshot(snapshot) => {
self.receive_snapshot(snapshot);
if let Some(range) = &self.current_range {
self.refresh_process_range(range.clone()).ok();
}
Ok(NamingResult::NULL)
}
NamingCmd::QueryGrpcDistroData => {
let res = self.query_grpc_distro_data();
Ok(NamingResult::GrpcDistroData(DistroData::ClientInstances(
res,
)))
}
NamingCmd::DiffGrpcDistroData { data, cluster_id } => {
if let DistroData::ClientInstances(data) = data {
let res = self.diff_grpc_distro_client_data(cluster_id, data);
Ok(NamingResult::DiffDistroData(
DistroData::DiffClientInstances(res),
))
} else {
Ok(NamingResult::NULL)
}
}
NamingCmd::QueryDistroInstanceSnapshot(instance_keys) => {
let instances = self.build_distro_instances(instance_keys);
Ok(NamingResult::DistroInstancesSnapshot(instances))
}
NamingCmd::PerpetualHostSniffing {
host,
service_keys,
success,
} => {
self.update_perpetual_health(host, service_keys, success);
Ok(NamingResult::NULL)
}
NamingCmd::NotifyUpdateRaftInstance(instance) => {
self.update_instance_to_raft(instance, ctx);
Ok(NamingResult::NULL)
}
NamingCmd::NotifyRemoveRaftInstance(instance_key) => {
self.remove_instance_to_raft(instance_key, ctx);
Ok(NamingResult::NULL)
}
NamingCmd::InitInstanceMeta(service_key, records) => {
self.init_service_metadata(service_key, records);
Ok(NamingResult::NULL)
}
NamingCmd::QueryAllServiceInstanceMetaData => {
let data = self.get_service_metadata_list();
Ok(NamingResult::AllServiceInstanceMetaData(data))
}
}
}
}
impl Handler<NamingRaftReq> for NamingActor {
type Result = anyhow::Result<NamingRaftResult>;
fn handle(&mut self, msg: NamingRaftReq, _ctx: &mut Context<Self>) -> Self::Result {
self.process_naming_raft_request(msg)
}
}
impl Handler<RaftApplyDataRequest> for NamingActor {
type Result = anyhow::Result<RaftApplyDataResponse>;
fn handle(&mut self, msg: RaftApplyDataRequest, _ctx: &mut Context<Self>) -> Self::Result {
match msg {
RaftApplyDataRequest::BuildSnapshot(writer) => {
self.build_snapshot(writer)?;
}
RaftApplyDataRequest::LoadSnapshotRecord(record) => {
self.load_snapshot_record(record)?;
}
RaftApplyDataRequest::LoadCompleted => {
self.load_completed()?;
}
}
Ok(RaftApplyDataResponse::None)
}
}
#[actix_rt::test]
async fn query_healthy_instances() {
use super::*;
use tokio::net::UdpSocket;
let mut naming = NamingActor::new();
naming.sys_config.instance_health_timeout_millis = 2000;
naming.sys_config.instance_timeout_millis = 4000;
let mut instance = Instance::new("127.0.0.1".to_owned(), 8080);
instance.namespace_id = Arc::new("public".to_owned());
instance.service_name = Arc::new("foo".to_owned());
instance.group_name = Arc::new("DEFUALT".to_owned());
instance.cluster_name = "DEFUALT".to_owned();
instance.init();
let key = instance.get_service_key();
naming.update_instance(&key, instance, None, false, None);
if let Some(service) = naming.service_map.get_mut(&key) {
service.protect_threshold = 0.1;
}
println!("-------------");
let items = naming.get_instance_list(&key, "", true);
assert!(!items.is_empty());
println!("DEFUALT list:{}", serde_json::to_string(&items).unwrap());
let items = naming.get_instance_list(&key, "", true);
assert!(!items.is_empty());
println!(
"empty cluster list:{}",
serde_json::to_string(&items).unwrap()
);
tokio::time::sleep(Duration::from_millis(2100)).await;
naming.time_check();
println!("-------------");
let items = naming.get_instance_list(&key, "", false);
assert!(!items.is_empty());
println!(
"empty cluster list:{}",
serde_json::to_string(&items).unwrap()
);
tokio::time::sleep(Duration::from_millis(2100)).await;
naming.time_check();
println!("-------------");
let items = naming.get_instance_list(&key, "", false);
assert!(items.is_empty());
println!(
"empty cluster list:{}",
serde_json::to_string(&items).unwrap()
);
}
#[test]
fn test_add_service() {
let mut naming = NamingActor::new();
let service_key = ServiceKey::new("1", "1", "1");
let service_info = ServiceDetailDto {
namespace_id: service_key.namespace_id.clone(),
service_name: service_key.service_name.clone(),
group_name: service_key.group_name.clone(),
metadata: Default::default(),
protect_threshold: Some(0.5),
..Default::default()
};
assert!(naming.namespace_index.service_size == 0);
naming.update_service(service_info);
assert!(naming.namespace_index.service_size == 1);
naming.remove_empty_service(service_key).unwrap();
assert!(naming.namespace_index.service_size == 0);
}
#[test]
fn test_remove_has_instance_service() {
let mut naming = NamingActor::new();
let mut instance = Instance::new("127.0.0.1".to_owned(), 8080);
instance.namespace_id = Arc::new("public".to_owned());
instance.service_name = Arc::new("foo".to_owned());
instance.group_name = Arc::new("DEFUALT".to_owned());
instance.cluster_name = "DEFUALT".to_owned();
instance.init();
let service_key = instance.get_service_key();
naming.update_instance(&service_key, instance.clone(), None, false, None);
let service_info = ServiceDetailDto {
namespace_id: service_key.namespace_id.clone(),
service_name: service_key.service_name.clone(),
group_name: service_key.group_name.clone(),
metadata: Default::default(),
protect_threshold: Some(0.5),
..Default::default()
};
assert!(naming.namespace_index.service_size == 1);
naming.update_service(service_info);
assert!(naming.namespace_index.service_size == 1);
assert!(naming.remove_empty_service(service_key.clone()).is_err());
assert!(naming.namespace_index.service_size == 1);
naming.remove_instance(&service_key, &instance.get_short_key(), None);
assert!(naming.namespace_index.service_size == 1);
assert!(naming.remove_empty_service(service_key.clone()).is_ok());
assert!(naming.namespace_index.service_size == 0);
}