ratchjob 0.2.1

一个rust实现的分布式任务调度平台服务。计划完全兼容xxl-job协议,然后再增强一些任务调度平台能力。
Documentation
use crate::common::app_config::AppConfig;
use crate::common::datetime_utils::now_millis;
use crate::metrics::collect::MetricsActorCollect;
use crate::metrics::counter::CounterManager;
use crate::metrics::gauge::GaugeManager;
use crate::metrics::histogram::HistogramManager;
use crate::metrics::metrics_key::MetricsKey;
use crate::metrics::model::{MetricsItem, MetricsRecord, MetricsRequest, MetricsResponse};
use crate::metrics::summary::SummaryManager;
use crate::metrics::timeline::core::MetricsTimelineManager;
use crate::metrics::timeline::model::{MetricsSnapshot, TimelineGroupType};
use actix::prelude::*;
use bean_factory::{bean, BeanFactory, FactoryData, Inject};
use bytes::BytesMut;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::{Pid, System};

#[bean(inject)]
#[derive(Debug)]
pub struct MetricsManager {
    counter_manager: CounterManager,
    gauge_manager: GaugeManager,
    histogram_manager: HistogramManager,
    summary_manager: SummaryManager,
    summary_key_config: Vec<(MetricsKey, MetricsKey)>,
    metrics_timeline_manager: MetricsTimelineManager,
    system: System,
    current_process_id: u32,
    start_time_millis: u64,
    total_memory: f32,
    last_collect_time: u64,
    last_log_time: u64,
    app_sys_config: Arc<AppConfig>,
    metrics_collect: Option<Arc<MetricsActorCollect>>,
}

impl MetricsManager {
    pub fn new(app_sys_config: Arc<AppConfig>) -> Self {
        let current_process_id = std::process::id();
        let start_time_millis = now_millis();
        let mut system = System::new();
        system.refresh_memory();
        let total_memory = system.total_memory() as f32 / (1024.0 * 1024.0);
        let mut gauge_manager = GaugeManager::default();
        gauge_manager.set(MetricsKey::SysTotalMemory, total_memory);
        let counter_manager = Self::init_counter_manager();
        Self {
            counter_manager,
            gauge_manager,
            histogram_manager: Default::default(),
            summary_manager: Default::default(),
            summary_key_config: Default::default(),
            metrics_timeline_manager: MetricsTimelineManager::new(),
            system,
            current_process_id,
            start_time_millis,
            total_memory,
            last_collect_time: 0,
            last_log_time: 0,
            app_sys_config,
            metrics_collect: None,
        }
    }

    fn init_counter_manager() -> CounterManager {
        let mut counter_manager = CounterManager::default();
        counter_manager.absolute(MetricsKey::TaskTriggerSize, 0);
        counter_manager.absolute(MetricsKey::TaskTriggerFinishSize, 0);
        counter_manager.absolute(MetricsKey::TaskRedoSize, 0);
        counter_manager.absolute(MetricsKey::TaskSuccessSize, 0);
        counter_manager.absolute(MetricsKey::TaskFailSize, 0);
        counter_manager.absolute(MetricsKey::TaskCallApiSize, 0);
        counter_manager.absolute(MetricsKey::TaskFinishTotalCount, 0);
        counter_manager.absolute(MetricsKey::HttpRequestTotalCount, 0);
        counter_manager
    }

    fn init(&mut self, ctx: &mut Context<Self>) {
        self.init_histogram();
        self.hb(ctx);
    }

    fn init_histogram(&mut self) {
        // 单位毫秒ms
        self.histogram_manager.init(
            MetricsKey::TaskFinishRtHistogram,
            &[
                0.1f32, 0.5f32, 1f32, 5f32, 10f32, 60f32, 600f32, 3600f32, 86400f32,
            ],
        );
        self.summary_manager.init(
            MetricsKey::TaskFinishRtSummary,
            &[0.5f32, 0.6f32, 0.7f32, 0.8f32, 0.9f32, 0.95f32, 1f32],
        );
        // 单位毫秒ms
        self.histogram_manager.init(
            MetricsKey::HttpRequestHandleRtHistogram,
            &[
                0.25f32, 0.5f32, 1f32, 3f32, 5f32, 10f32, 25f32, 50f32, 100f32, 300f32, 500f32,
            ],
        );
        self.summary_manager.init(
            MetricsKey::HttpRequestHandleRtSummary,
            &[0.5f32, 0.6f32, 0.7f32, 0.8f32, 0.9f32, 0.95f32, 1f32],
        );

        //summary from histogram
        self.summary_key_config.push((
            MetricsKey::HttpRequestHandleRtSummary,
            MetricsKey::HttpRequestHandleRtHistogram,
        ));
        self.summary_key_config.push((
            MetricsKey::TaskFinishRtSummary,
            MetricsKey::TaskFinishRtHistogram,
        ));
    }

    fn reset_summary(&mut self) {
        for (summary_key, histogram_key) in &self.summary_key_config {
            if let Some(histogram_value) = self.histogram_manager.get_value(histogram_key) {
                self.summary_manager
                    .recalculate_from_histogram(summary_key, histogram_value);
            }
        }
    }

    async fn do_peek_metrics(
        metrics_actor_collect: Option<Arc<MetricsActorCollect>>,
    ) -> anyhow::Result<Vec<MetricsItem>> {
        if let Some(metrics_actor_collect) = metrics_actor_collect {
            metrics_actor_collect.peek_metrics().await
        } else {
            Ok(vec![])
        }
    }

    fn update_peek_metrics(&mut self, r: anyhow::Result<Vec<MetricsItem>>) {
        if let Ok(list) = r {
            for item in list {
                self.update_item_record(item);
            }
        }
    }

    fn update_item_record(&mut self, item: MetricsItem) {
        match item.record {
            MetricsRecord::CounterInc(v) => self.counter_manager.increment(item.metrics_type, v),
            MetricsRecord::Gauge(v) => self.gauge_manager.set(item.metrics_type, v),
            MetricsRecord::HistogramRecord(v) => {
                self.histogram_manager.record(&item.metrics_type, v)
            }
            MetricsRecord::HistogramRecords(batch_value) => self
                .histogram_manager
                .record_many(&item.metrics_type, &batch_value),
        }
    }

    fn print_metrics(&mut self) {
        //不打印监控指标
        if !self.app_sys_config.metrics_log_enable {
            return;
        }
        let now = now_millis();
        if now - self.last_log_time < (self.app_sys_config.metrics_log_interval_second - 1) * 1000 {
            return;
        }
        //log::info!("-------------- log metrics start --------------");
        self.print_sys_metrics();
        self.gauge_manager.print_metrics();
        self.counter_manager.print_metrics();
        self.histogram_manager.print_metrics();
        self.summary_manager.print_metrics();
        self.last_log_time = now;
    }

    fn load_sys_metrics(&mut self) {
        self.system.refresh_all();
        if let Some(process) = self.system.process(Pid::from_u32(self.current_process_id)) {
            let cpu_usage = process.cpu_usage();
            let rss = process.memory() as f32 / (1024.0 * 1024.0);
            let vms = process.virtual_memory() as f32 / (1024.0 * 1024.0);
            let rss_usage = rss / self.total_memory * 100.0;
            let running_seconds = (now_millis() - self.start_time_millis) / 1000;
            //log::info!("[metrics_system]|already running seconds: {}s|cpu_usage: {:.2}%|rss_usage: {:.2}%|rss: {:.2}M|vms: {:.2}M|total_memory: {:.2}M|",running_seconds,&cpu_usage,&rss_usage,&rss,&vms,&self.total_memory);
            self.gauge_manager
                .set(MetricsKey::ProcessStartTimeSeconds, running_seconds as f32);
            self.gauge_manager.set(MetricsKey::AppCpuUsage, cpu_usage);
            self.gauge_manager.set(MetricsKey::AppRssMemory, rss);
            self.gauge_manager.set(MetricsKey::AppVmsMemory, vms);
            self.gauge_manager
                .set(MetricsKey::AppMemoryUsage, rss_usage);
        }
        self.last_collect_time = now_millis();
    }

    fn print_sys_metrics(&self) {
        let cpu_usage = self
            .gauge_manager
            .value(&MetricsKey::AppCpuUsage)
            .unwrap_or_default();
        let rss = self
            .gauge_manager
            .value(&MetricsKey::AppRssMemory)
            .unwrap_or_default();
        let vms = self
            .gauge_manager
            .value(&MetricsKey::AppVmsMemory)
            .unwrap_or_default();
        let rss_usage = self
            .gauge_manager
            .value(&MetricsKey::AppMemoryUsage)
            .unwrap_or_default();
        let running_seconds = self
            .gauge_manager
            .value(&MetricsKey::ProcessStartTimeSeconds)
            .unwrap_or_default();
        log::info!("[metrics_system]|already running seconds: {}s|cpu_usage: {:.2}%|rss_usage: {:.2}%|rss: {:.2}M|vms: {:.2}M|total_memory: {:.2}M|",running_seconds,&cpu_usage,&rss_usage,&rss,&vms,&self.total_memory);
    }

    fn after_peek_metrics(&mut self) {
        self.reset_summary();
        self.load_sys_metrics();
        self.print_metrics();
        let now = now_millis();
        self.record_timeline_snapshot(now, TimelineGroupType::Least);
        self.record_timeline_snapshot(now, TimelineGroupType::Minute);
        self.record_timeline_snapshot(now, TimelineGroupType::Hour);
    }

    fn load_metrics(&mut self, ctx: &mut Context<Self>) {
        let metrics_collect = self.metrics_collect.clone();
        async move { Self::do_peek_metrics(metrics_collect).await }
            .into_actor(self)
            .map(|r, act, ctx| {
                act.update_peek_metrics(r);
                act.after_peek_metrics();
                act.hb(ctx);
            })
            .spawn(ctx);
    }

    fn build_snapshot(&self, now_ms: u64) -> MetricsSnapshot {
        MetricsSnapshot {
            gauge_data_map: self.gauge_manager.data_map.clone(),
            counter_data_map: self.counter_manager.data_map.clone(),
            histogram_data_map: self.histogram_manager.data_map.clone(),
            snapshot_time: now_ms,
        }
    }

    fn record_timeline_snapshot(&mut self, now_ms: u64, group_type: TimelineGroupType) {
        //增加200ms超过间隔后,增加记录
        if now_ms + 200 - group_type.get_interval_millis()
            > self
                .metrics_timeline_manager
                .get_last_record_time(&group_type)
        {
            self.metrics_timeline_manager
                .add_record(&group_type, self.build_snapshot(now_ms));
        }
    }

    fn hb(&mut self, ctx: &mut Context<Self>) {
        ctx.run_later(
            Duration::from_secs(self.app_sys_config.metrics_collect_interval_second),
            |act, ctx| {
                act.load_metrics(ctx);
            },
        );
    }

    fn export(&mut self) -> anyhow::Result<String> {
        let mut bytes_mut = BytesMut::new();
        self.counter_manager.export(&mut bytes_mut)?;
        self.gauge_manager.export(&mut bytes_mut)?;
        self.histogram_manager.export(&mut bytes_mut)?;
        self.reset_summary();
        self.summary_manager.export(&mut bytes_mut)?;
        Ok(String::from_utf8(bytes_mut.to_vec())?)
    }
}

impl Actor for MetricsManager {
    type Context = Context<Self>;
    fn started(&mut self, _ctx: &mut Self::Context) {
        log::info!("MetricsManager started");
    }
}

impl Inject for MetricsManager {
    type Context = Context<Self>;

    fn inject(
        &mut self,
        factory_data: FactoryData,
        _factory: BeanFactory,
        ctx: &mut Self::Context,
    ) {
        self.metrics_collect = MetricsActorCollect::from_factory(&factory_data);
        self.metrics_timeline_manager
            .set_least_interval(self.app_sys_config.metrics_collect_interval_second);
        if self.app_sys_config.metrics_enable {
            log::info!(
                "metrics enable! log_interval: {}s",
                self.app_sys_config.metrics_log_interval_second
            );
            self.init(ctx);
        } else {
            log::info!("metrics disable!");
        }
    }
}

impl Handler<MetricsRequest> for MetricsManager {
    type Result = anyhow::Result<MetricsResponse>;

    fn handle(&mut self, msg: MetricsRequest, _ctx: &mut Self::Context) -> Self::Result {
        if !self.app_sys_config.metrics_enable {
            return Ok(MetricsResponse::None);
        }
        match msg {
            MetricsRequest::Record(item) => {
                self.update_item_record(item);
                Ok(MetricsResponse::None)
            }
            MetricsRequest::BatchRecord(items) => {
                for item in items {
                    self.update_item_record(item);
                }
                Ok(MetricsResponse::None)
            }
            MetricsRequest::Export => {
                let v = self.export()?;
                Ok(MetricsResponse::ExportInfo(v))
            }
            MetricsRequest::TimelineQuery(param) => {
                let response = self.metrics_timeline_manager.query(param);
                Ok(MetricsResponse::TimelineResponse(response))
            }
        }
    }
}