use crate::app::model::AppRouteRequest;
use crate::common::datetime_utils::now_millis;
use crate::raft::cluster::model::{RouterRequest, VoteChangeRequest, VoteInfo};
use crate::raft::cluster::router_request;
use crate::raft::network::factory::RaftClusterRequestSender;
use crate::schedule::core::ScheduleManager;
use crate::user::core::UserManager;
use actix::prelude::*;
use bean_factory::{bean, BeanFactory, FactoryData, Inject};
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeStatus {
Valid,
Invalid,
}
impl Default for NodeStatus {
fn default() -> Self {
Self::Valid
}
}
#[derive(Default, Debug, Clone)]
pub struct ClusterNode {
pub id: u64,
pub index: u64,
pub is_local: bool,
pub addr: Arc<String>,
pub status: NodeStatus,
}
#[derive(Default, Debug, Clone)]
pub struct ClusterInnerNode {
pub id: u64,
pub index: u64,
pub is_local: bool,
pub addr: Arc<String>,
pub status: NodeStatus,
pub last_active_time: u64,
}
impl ClusterInnerNode {
#[allow(dead_code)]
pub(crate) fn is_valid(&self) -> bool {
self.is_local || self.status == NodeStatus::Valid
}
}
impl From<ClusterInnerNode> for ClusterNode {
fn from(value: ClusterInnerNode) -> Self {
Self {
id: value.id,
index: value.index,
is_local: value.is_local,
addr: value.addr,
status: value.status,
}
}
}
#[bean(inject)]
pub struct ClusterNodeManager {
local_id: u64,
all_nodes: BTreeMap<u64, ClusterInnerNode>,
cluster_sender: Option<Arc<RaftClusterRequestSender>>,
schedule_manager: Option<Addr<ScheduleManager>>,
user_manager: Option<Addr<UserManager>>,
first_init: bool,
last_vote: VoteInfo,
}
impl ClusterNodeManager {
pub fn new(local_id: u64) -> Self {
Self {
local_id,
all_nodes: BTreeMap::new(),
cluster_sender: None,
schedule_manager: None,
user_manager: None,
first_init: false,
last_vote: VoteInfo::default(),
}
}
fn update_nodes(&mut self, nodes: Vec<(u64, Arc<String>)>, ctx: &mut Context<Self>) {
if self.cluster_sender.is_none() {
log::warn!("InnerNodeManage cluster_sender is none");
return;
}
let new_sets: HashSet<u64> = nodes.iter().map(|e| e.0.to_owned()).collect();
let mut dels = vec![];
for key in self.all_nodes.keys() {
if !new_sets.contains(key) {
dels.push(*key);
}
}
for key in dels {
self.all_nodes.remove(&key);
}
let now = now_millis();
for (key, addr) in nodes {
if let Some(node) = self.all_nodes.get_mut(&key) {
node.addr = addr;
} else {
let is_local = self.local_id == key;
let node = ClusterInnerNode {
id: key,
index: 0,
is_local,
addr,
status: NodeStatus::Valid,
last_active_time: now,
};
self.all_nodes.insert(key, node);
}
}
let local_node = self.get_this_node();
self.all_nodes.entry(self.local_id).or_insert(local_node);
self.update_nodes_index();
if !self.first_init {
self.first_init = true;
ctx.run_later(Duration::from_millis(1000), |act, _ctx| {
act.load_snapshot_from_node();
});
}
}
fn update_nodes_index(&mut self) {
for (i, value) in self.all_nodes.values_mut().enumerate() {
value.index = i as u64;
}
}
fn load_snapshot_from_node(&self) {
}
fn get_this_node(&self) -> ClusterInnerNode {
if let Some(node) = self.all_nodes.get(&self.local_id) {
node.to_owned()
} else {
ClusterInnerNode {
id: self.local_id,
is_local: true,
..Default::default()
}
}
}
fn get_all_nodes(&self) -> Vec<ClusterNode> {
if self.all_nodes.is_empty() {
vec![self.get_this_node().into()]
} else {
self.all_nodes.values().cloned().map(|e| e.into()).collect()
}
}
fn notify_vote_change(&self) {
let local_is_master = self.local_id == self.last_vote.voted_for;
if let Some(schedule_manager) = self.schedule_manager.as_ref() {
schedule_manager.do_send(VoteChangeRequest::VoteChange {
vote_info: self.last_vote.clone(),
local_is_master,
});
}
if let Some(user_manager) = self.user_manager.as_ref() {
user_manager.do_send(VoteChangeRequest::VoteChange {
vote_info: self.last_vote.clone(),
local_is_master,
});
}
}
async fn do_send_to_other_nodes(
req: RouterRequest,
addrs: Vec<Arc<String>>,
sender: Arc<RaftClusterRequestSender>,
) -> anyhow::Result<()> {
for addr in addrs {
router_request(req.clone(), addr, &sender).await?;
}
Ok(())
}
fn send_to_other_nodes(&mut self, req: ToOtherRequest, ctx: &mut Context<Self>) {
let req = match req {
ToOtherRequest::AppRouteRequest(req) => RouterRequest::AppRouteRequest(req),
};
let mut addrs = Vec::with_capacity(self.all_nodes.len());
for node in self.all_nodes.values() {
if node.is_local {
continue;
}
addrs.push(node.addr.clone());
}
if let Some(cluster_sender) = self.cluster_sender.clone() {
Self::do_send_to_other_nodes(req, addrs, cluster_sender)
.into_actor(self)
.map(|_, _, _| {})
.spawn(ctx)
}
}
}
impl Actor for ClusterNodeManager {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
log::info!("ClusterNodeManager started!");
}
}
impl Inject for ClusterNodeManager {
type Context = Context<Self>;
fn inject(
&mut self,
factory_data: FactoryData,
_factory: BeanFactory,
_ctx: &mut Self::Context,
) {
self.cluster_sender = factory_data.get_bean();
self.schedule_manager = factory_data.get_actor();
self.user_manager = factory_data.get_actor();
}
}
#[derive(Debug)]
pub enum ToOtherRequest {
AppRouteRequest(AppRouteRequest),
}
#[derive(Message, Debug)]
#[rtype(result = "anyhow::Result<NodeManageResponse>")]
pub enum NodeManageRequest {
UpdateNodes(Vec<(u64, Arc<String>)>),
UpdateVoted { current_term: u64, voted_for: u64 },
GetThisNode,
GetAllNodes,
GetNode(u64),
SendToOtherNodes(ToOtherRequest),
}
pub enum NodeManageResponse {
None,
ThisNode(ClusterNode),
Node(Option<ClusterNode>),
AllNodes(Vec<ClusterNode>),
}
impl Handler<NodeManageRequest> for ClusterNodeManager {
type Result = anyhow::Result<NodeManageResponse>;
fn handle(&mut self, msg: NodeManageRequest, ctx: &mut Self::Context) -> Self::Result {
match msg {
NodeManageRequest::UpdateNodes(nodes) => {
log::info!("InnerNodeManage UpdateNodes,size:{}", nodes.len());
self.update_nodes(nodes, ctx);
Ok(NodeManageResponse::None)
}
NodeManageRequest::GetThisNode => {
Ok(NodeManageResponse::ThisNode(self.get_this_node().into()))
}
NodeManageRequest::GetAllNodes => {
Ok(NodeManageResponse::AllNodes(self.get_all_nodes()))
}
NodeManageRequest::GetNode(node_id) => {
let node = self.all_nodes.get(&node_id).map(|e| e.to_owned().into());
Ok(NodeManageResponse::Node(node))
}
NodeManageRequest::UpdateVoted {
voted_for,
current_term,
} => {
log::info!(
"UpdateVoted,local node_id:{},voted_for:{},{}",
&self.local_id,
&voted_for,
¤t_term
);
let vote_info = VoteInfo::new(voted_for, current_term);
self.last_vote = vote_info;
self.notify_vote_change();
Ok(NodeManageResponse::None)
}
NodeManageRequest::SendToOtherNodes(req) => {
self.send_to_other_nodes(req, ctx);
Ok(NodeManageResponse::None)
}
}
}
}