ratchjob 0.2.1

一个rust实现的分布式任务调度平台服务。计划完全兼容xxl-job协议,然后再增强一些任务调度平台能力。
Documentation
use std::sync::Arc;

use crate::common::app_config::AppConfig;
use crate::grpc::handler::CLUSTER_TOKEN;
use crate::grpc::ratch_server_proto;
use crate::grpc::ratch_server_proto::request_client::RequestClient;
use crate::grpc::ratch_server_proto::Payload;
use actix::prelude::*;
use inner_mem_cache::MemCache;
use tonic::transport::Channel;

#[derive(Debug)]
pub struct RaftClusterRequestSender {
    conn_factory: Addr<RaftConnectionFactory>,
    app_config: Arc<AppConfig>,
}

impl RaftClusterRequestSender {
    pub fn new(conn_factory: Addr<RaftConnectionFactory>, sys_config: Arc<AppConfig>) -> Self {
        Self {
            conn_factory,
            app_config: sys_config,
        }
    }

    pub async fn get_node_channel(&self, addr: Arc<String>) -> anyhow::Result<Arc<Channel>> {
        let res: RaftConnResponse = self
            .conn_factory
            .send(RaftConnRequest::GetChannel(addr))
            .await??;
        match res {
            RaftConnResponse::Channel(channel) => Ok(channel),
            RaftConnResponse::None => Err(anyhow::anyhow!("get raft conn error")),
        }
    }

    pub async fn send_request(
        &self,
        addr: Arc<String>,
        mut payload: Payload,
    ) -> anyhow::Result<Payload> {
        let channel = self.get_node_channel(addr.clone()).await?;
        let mut request_client = RequestClient::new(channel.as_ref().clone());
        if !self.app_config.cluster_token.is_empty() {
            payload.headers.insert(
                CLUSTER_TOKEN.to_string(),
                self.app_config.cluster_token.as_str().to_string(),
            );
        }
        let resp = match request_client.request(payload).await {
            Ok(resp) => {
                self.conn_factory.do_send(RaftConnRequest::UpdateChannel {
                    key: addr,
                    is_active: true,
                });
                resp
            }
            Err(err) => {
                self.conn_factory.do_send(RaftConnRequest::UpdateChannel {
                    key: addr,
                    is_active: false,
                });
                return Err(err.into());
            }
        };
        let response: ratch_server_proto::Response = resp.into_inner();
        if response.code != 0 {
            return Err(anyhow::anyhow!(
                "raft target response error,{}",
                response.message
            ));
        }
        if let Some(payload) = response.data {
            Ok(payload)
        } else {
            Err(anyhow::anyhow!("raft target response error"))
        }
    }
}

pub struct RaftConnectionFactory {
    channel_cache: MemCache<Arc<String>, Arc<Channel>>,
    cache_ses: i32,
}

impl RaftConnectionFactory {
    pub fn new(cache_ses: i32) -> Self {
        Self {
            channel_cache: MemCache::<Arc<String>, Arc<Channel>>::new(),
            cache_ses,
        }
    }

    fn build_channel(&mut self, key: Arc<String>) -> anyhow::Result<Arc<Channel>> {
        self.channel_cache.clear_time_out();
        if let Ok(channel) = self.channel_cache.get(&key) {
            Ok(channel)
        } else {
            let addr = format!("http://{}", &key);
            let channel = Arc::new(Channel::from_shared(addr)?.connect_lazy());
            self.channel_cache.set(key, channel.clone(), self.cache_ses);
            Ok(channel)
        }
    }

    fn update_channel_status(&mut self, key: Arc<String>, is_active: bool) {
        if let Ok(channel) = self.channel_cache.get(&key) {
            if is_active {
                let tll = self.channel_cache.time_to_live(&key);
                if tll < 10 {
                    self.channel_cache.set(key, channel, self.cache_ses);
                }
            } else {
                self.channel_cache.remove(&key);
            }
        }
    }
}

impl Actor for RaftConnectionFactory {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        log::info!("ConnectionFactory started")
    }
}

#[derive(Message, Debug)]
#[rtype(result = "anyhow::Result<RaftConnResponse>")]
pub enum RaftConnRequest {
    GetChannel(Arc<String>),
    UpdateChannel { key: Arc<String>, is_active: bool },
}

pub enum RaftConnResponse {
    Channel(Arc<Channel>),
    None,
}

impl Handler<RaftConnRequest> for RaftConnectionFactory {
    type Result = anyhow::Result<RaftConnResponse>;

    fn handle(&mut self, msg: RaftConnRequest, _ctx: &mut Self::Context) -> Self::Result {
        match msg {
            RaftConnRequest::GetChannel(key) => {
                let c = self.build_channel(key)?;
                Ok(RaftConnResponse::Channel(c))
            }
            RaftConnRequest::UpdateChannel { key, is_active } => {
                self.update_channel_status(key, is_active);
                Ok(RaftConnResponse::None)
            }
        }
    }
}