ratchjob 0.2.1

一个rust实现的分布式任务调度平台服务。计划完全兼容xxl-job协议,然后再增强一些任务调度平台能力。
Documentation
use crate::cache::actor_model::{
    CacheManagerLocalReq, CacheManagerRaftReq, CacheManagerRaftResult, SetInfo,
};
use crate::cache::model::{CacheKey, CacheValue};
use crate::common::constant::CACHE_TABLE_NAME;
use crate::common::datetime_utils::now_second_i32;
use crate::common::pb::data_object::CacheItemDo;

use crate::raft::store::model::SnapshotRecordDto;
use crate::raft::store::raftapply::{RaftApplyDataRequest, RaftApplyDataResponse};
use crate::raft::store::raftsnapshot::{SnapshotWriterActor, SnapshotWriterRequest};
use actix::prelude::*;
use bean_factory::{bean, Inject};
use inner_mem_cache::TimeoutSet;
use quick_protobuf::{BytesReader, Writer};
use std::collections::HashMap;

pub struct CacheItem {
    pub expire: i32,
    pub value: CacheValue,
}

impl CacheItem {
    pub fn new(value: CacheValue, expire: i32) -> Self {
        CacheItem { expire, value }
    }
}

#[bean(inject)]
#[derive(Default)]
pub struct CacheManager {
    cache: HashMap<CacheKey, CacheItem>,
    time_set: TimeoutSet<CacheKey>,
}

impl CacheManager {
    pub fn new() -> Self {
        Self {
            cache: HashMap::new(),
            time_set: TimeoutSet::new(),
        }
    }

    fn clear_time_out(&mut self) {
        let now_second = now_second_i32();
        for key in self.time_set.timeout(now_second as u64) {
            if let Some(v) = self.cache.get(&key) {
                if v.expire > -1 && v.expire < now_second {
                    self.cache.remove(&key);
                }
            }
        }
    }

    fn heartbeat(&mut self, ctx: &mut Context<Self>) {
        ctx.run_later(std::time::Duration::from_secs(1), move |act, ctx| {
            act.clear_time_out();
            act.heartbeat(ctx);
        });
    }

    fn do_set(&mut self, key: CacheKey, value: CacheValue, expire: i32) {
        self.cache
            .insert(key.clone(), CacheItem::new(value, expire));
        if expire < 0 {
            //永久不过期
            return;
        }
        self.time_set.add(expire as u64, key);
    }

    fn get_valid_value(&self, key: &CacheKey) -> Option<&CacheValue> {
        if let Some(v) = self.cache.get(key) {
            if v.expire > -1 && v.expire < now_second_i32() {
                None
            } else {
                Some(&v.value)
            }
        } else {
            None
        }
    }

    fn set_nx(&mut self, key: CacheKey, value: CacheValue, expire: i32) -> CacheManagerRaftResult {
        if let Some(_v) = self.get_valid_value(&key) {
            CacheManagerRaftResult::Nil
        } else {
            self.do_set(key, value, expire);
            CacheManagerRaftResult::Ok
        }
    }

    fn set_xx(&mut self, key: CacheKey, value: CacheValue, expire: i32) -> CacheManagerRaftResult {
        if let Some(_v) = self.get_valid_value(&key) {
            self.do_set(key, value, expire);
            CacheManagerRaftResult::Ok
        } else {
            CacheManagerRaftResult::Nil
        }
    }

    fn set_value(
        &mut self,
        key: CacheKey,
        value: CacheValue,
        expire: i32,
    ) -> CacheManagerRaftResult {
        self.do_set(key, value, expire);
        CacheManagerRaftResult::Ok
    }

    fn set(&mut self, set_info: SetInfo) -> CacheManagerRaftResult {
        if set_info.nx {
            self.set_nx(set_info.key, set_info.value, set_info.ttl + set_info.now)
        } else if set_info.xx {
            self.set_xx(set_info.key, set_info.value, set_info.ttl + set_info.now)
        } else {
            self.set_value(set_info.key, set_info.value, set_info.ttl + set_info.now)
        }
    }

    fn get_set(&mut self, set_info: SetInfo) -> CacheManagerRaftResult {
        let r = if let Some(v) = self.get_valid_value(&set_info.key) {
            CacheManagerRaftResult::Value(v.clone())
        } else {
            CacheManagerRaftResult::None
        };
        self.do_set(set_info.key, set_info.value, set_info.ttl + set_info.now);
        r
    }

    fn get_value(&mut self, key: &CacheKey) -> CacheManagerRaftResult {
        if let Some(v) = self.get_valid_value(key) {
            CacheManagerRaftResult::Value(v.clone())
        } else {
            CacheManagerRaftResult::None
        }
    }

    fn remove(&mut self, key: &CacheKey) -> CacheManagerRaftResult {
        self.cache.remove(key);
        CacheManagerRaftResult::Ok
    }

    fn exists(&mut self, key: &CacheKey) -> CacheManagerRaftResult {
        if let Some(_v) = self.get_valid_value(key) {
            CacheManagerRaftResult::Exists(true)
        } else {
            CacheManagerRaftResult::Exists(false)
        }
    }

    fn expire(&mut self, key: CacheKey, expire: i32) -> CacheManagerRaftResult {
        if let Some(v) = self.get_valid_value(&key) {
            self.do_set(key, v.clone(), expire);
            CacheManagerRaftResult::Ok
        } else {
            CacheManagerRaftResult::Nil
        }
    }

    fn get_ttl(&mut self, key: &CacheKey) -> CacheManagerRaftResult {
        let t = if let Some(v) = self.cache.get(key) {
            let t = v.expire - now_second_i32();
            if t > -2 {
                t
            } else {
                -2
            }
        } else {
            -2
        };
        CacheManagerRaftResult::Ttl(t)
    }

    fn incr(&mut self, key: CacheKey, expire: i32) -> CacheManagerRaftResult {
        if let Some(v) = self.get_valid_value(&key) {
            if let Some(v) = v.try_to_number() {
                if v == i64::MAX {
                    return CacheManagerRaftResult::Nil;
                }
                let value = CacheValue::Number(v + 1);
                self.do_set(key, value.clone(), expire);
                CacheManagerRaftResult::Value(value)
            } else {
                CacheManagerRaftResult::Nil
            }
        } else {
            let value = CacheValue::Number(1);
            self.do_set(key, value.clone(), expire);
            CacheManagerRaftResult::Value(value)
        }
    }

    fn decr(&mut self, key: CacheKey, expire: i32) -> CacheManagerRaftResult {
        if let Some(v) = self.get_valid_value(&key) {
            if let Some(v) = v.try_to_number() {
                if v == i64::MIN {
                    return CacheManagerRaftResult::Nil;
                }
                let value = CacheValue::Number(v - 1);
                self.do_set(key, value.clone(), expire);
                CacheManagerRaftResult::Value(value)
            } else {
                CacheManagerRaftResult::Nil
            }
        } else {
            let value = CacheValue::Number(-1);
            self.do_set(key, value.clone(), expire);
            CacheManagerRaftResult::Value(value)
        }
    }

    fn build_snapshot(&self, writer: Addr<SnapshotWriterActor>) -> anyhow::Result<()> {
        let now = now_second_i32();
        for (key, v) in self.cache.iter() {
            if v.expire > -1 && v.expire < now {
                continue;
            }
            let mut buf = Vec::new();
            {
                let mut writer = Writer::new(&mut buf);
                let value_do = v.value.to_do(key);
                writer.write_message(&value_do)?;
            }
            let record = SnapshotRecordDto {
                tree: CACHE_TABLE_NAME.clone(),
                key: key.to_key_string().into_bytes(),
                value: buf,
                op_type: 0,
            };
            writer.do_send(SnapshotWriterRequest::Record(record));
        }
        Ok(())
    }

    fn load_snapshot_record(&mut self, record: SnapshotRecordDto) -> anyhow::Result<()> {
        let mut reader = BytesReader::from_bytes(&record.value);
        let value_do: CacheItemDo = reader.read_message(&record.value)?;
        let key = CacheKey::from_db_key(record.key)?;
        let value = CacheValue::from_bytes(&value_do.data, key.cache_type.clone())?;
        self.do_set(key, value, value_do.timeout);
        Ok(())
    }

    fn load_completed(&mut self, _ctx: &mut Context<Self>) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Actor for CacheManager {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        log::info!("CacheManager actor started");
        ctx.run_later(std::time::Duration::from_millis(500), |act, ctx| {
            act.heartbeat(ctx);
        });
    }
}

impl Inject for CacheManager {
    type Context = Context<Self>;

    fn inject(
        &mut self,
        _factory_data: bean_factory::FactoryData,
        _factory: bean_factory::BeanFactory,
        _ctx: &mut Self::Context,
    ) {
    }
}

impl Handler<CacheManagerLocalReq> for CacheManager {
    type Result = anyhow::Result<CacheManagerRaftResult>;

    fn handle(&mut self, req: CacheManagerLocalReq, _ctx: &mut Self::Context) -> Self::Result {
        match req {
            CacheManagerLocalReq::Get(key) => Ok(self.get_value(&key)),
            CacheManagerLocalReq::Exists(key) => Ok(self.exists(&key)),
            CacheManagerLocalReq::Ttl(key) => Ok(self.get_ttl(&key)),
        }
    }
}

impl Handler<CacheManagerRaftReq> for CacheManager {
    type Result = anyhow::Result<CacheManagerRaftResult>;

    fn handle(&mut self, req: CacheManagerRaftReq, _ctx: &mut Self::Context) -> Self::Result {
        match req {
            CacheManagerRaftReq::Set(set_info) => Ok(self.set(set_info)),
            CacheManagerRaftReq::GetSet(set_info) => Ok(self.get_set(set_info)),
            CacheManagerRaftReq::Get(key) => Ok(self.get_value(&key)),
            CacheManagerRaftReq::Remove(key) => Ok(self.remove(&key)),
            CacheManagerRaftReq::Exists(key) => Ok(self.exists(&key)),
            CacheManagerRaftReq::Expire(key, expire) => Ok(self.expire(key, expire)),
            CacheManagerRaftReq::Ttl(key) => Ok(self.get_ttl(&key)),
            CacheManagerRaftReq::Incr(key, expire) => Ok(self.incr(key, expire)),
            CacheManagerRaftReq::Decr(key, expire) => Ok(self.decr(key, expire)),
        }
    }
}

impl Handler<RaftApplyDataRequest> for CacheManager {
    type Result = anyhow::Result<RaftApplyDataResponse>;

    fn handle(&mut self, msg: RaftApplyDataRequest, ctx: &mut Self::Context) -> Self::Result {
        match msg {
            RaftApplyDataRequest::BuildSnapshot(writer) => {
                self.build_snapshot(writer)?;
            }
            RaftApplyDataRequest::LoadSnapshotRecord(record) => {
                self.load_snapshot_record(record)?;
            }
            RaftApplyDataRequest::LoadCompleted => {
                self.load_completed(ctx)?;
            }
        }
        Ok(RaftApplyDataResponse::None)
    }
}