use crate::common::app_config::AppConfig;
use crate::common::datetime_utils::now_millis;
use crate::metrics::collect::MetricsActorCollect;
use crate::metrics::counter::CounterManager;
use crate::metrics::gauge::GaugeManager;
use crate::metrics::histogram::HistogramManager;
use crate::metrics::metrics_key::MetricsKey;
use crate::metrics::model::{MetricsItem, MetricsRecord, MetricsRequest, MetricsResponse};
use crate::metrics::summary::SummaryManager;
use crate::metrics::timeline::core::MetricsTimelineManager;
use crate::metrics::timeline::model::{MetricsSnapshot, TimelineGroupType};
use actix::prelude::*;
use bean_factory::{bean, BeanFactory, FactoryData, Inject};
use bytes::BytesMut;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::{Pid, System};
#[bean(inject)]
#[derive(Debug)]
pub struct MetricsManager {
counter_manager: CounterManager,
gauge_manager: GaugeManager,
histogram_manager: HistogramManager,
summary_manager: SummaryManager,
summary_key_config: Vec<(MetricsKey, MetricsKey)>,
metrics_timeline_manager: MetricsTimelineManager,
system: System,
current_process_id: u32,
start_time_millis: u64,
total_memory: f32,
last_collect_time: u64,
last_log_time: u64,
app_sys_config: Arc<AppConfig>,
metrics_collect: Option<Arc<MetricsActorCollect>>,
}
impl MetricsManager {
pub fn new(app_sys_config: Arc<AppConfig>) -> Self {
let current_process_id = std::process::id();
let start_time_millis = now_millis();
let mut system = System::new();
system.refresh_memory();
let total_memory = system.total_memory() as f32 / (1024.0 * 1024.0);
let mut gauge_manager = GaugeManager::default();
gauge_manager.set(MetricsKey::SysTotalMemory, total_memory);
let counter_manager = Self::init_counter_manager();
Self {
counter_manager,
gauge_manager,
histogram_manager: Default::default(),
summary_manager: Default::default(),
summary_key_config: Default::default(),
metrics_timeline_manager: MetricsTimelineManager::new(),
system,
current_process_id,
start_time_millis,
total_memory,
last_collect_time: 0,
last_log_time: 0,
app_sys_config,
metrics_collect: None,
}
}
fn init_counter_manager() -> CounterManager {
let mut counter_manager = CounterManager::default();
counter_manager.absolute(MetricsKey::TaskTriggerSize, 0);
counter_manager.absolute(MetricsKey::TaskTriggerFinishSize, 0);
counter_manager.absolute(MetricsKey::TaskRedoSize, 0);
counter_manager.absolute(MetricsKey::TaskSuccessSize, 0);
counter_manager.absolute(MetricsKey::TaskFailSize, 0);
counter_manager.absolute(MetricsKey::TaskCallApiSize, 0);
counter_manager.absolute(MetricsKey::TaskFinishTotalCount, 0);
counter_manager.absolute(MetricsKey::HttpRequestTotalCount, 0);
counter_manager
}
fn init(&mut self, ctx: &mut Context<Self>) {
self.init_histogram();
self.hb(ctx);
}
fn init_histogram(&mut self) {
self.histogram_manager.init(
MetricsKey::TaskFinishRtHistogram,
&[
0.1f32, 0.5f32, 1f32, 5f32, 10f32, 60f32, 600f32, 3600f32, 86400f32,
],
);
self.summary_manager.init(
MetricsKey::TaskFinishRtSummary,
&[0.5f32, 0.6f32, 0.7f32, 0.8f32, 0.9f32, 0.95f32, 1f32],
);
self.histogram_manager.init(
MetricsKey::HttpRequestHandleRtHistogram,
&[
0.25f32, 0.5f32, 1f32, 3f32, 5f32, 10f32, 25f32, 50f32, 100f32, 300f32, 500f32,
],
);
self.summary_manager.init(
MetricsKey::HttpRequestHandleRtSummary,
&[0.5f32, 0.6f32, 0.7f32, 0.8f32, 0.9f32, 0.95f32, 1f32],
);
self.summary_key_config.push((
MetricsKey::HttpRequestHandleRtSummary,
MetricsKey::HttpRequestHandleRtHistogram,
));
self.summary_key_config.push((
MetricsKey::TaskFinishRtSummary,
MetricsKey::TaskFinishRtHistogram,
));
}
fn reset_summary(&mut self) {
for (summary_key, histogram_key) in &self.summary_key_config {
if let Some(histogram_value) = self.histogram_manager.get_value(histogram_key) {
self.summary_manager
.recalculate_from_histogram(summary_key, histogram_value);
}
}
}
async fn do_peek_metrics(
metrics_actor_collect: Option<Arc<MetricsActorCollect>>,
) -> anyhow::Result<Vec<MetricsItem>> {
if let Some(metrics_actor_collect) = metrics_actor_collect {
metrics_actor_collect.peek_metrics().await
} else {
Ok(vec![])
}
}
fn update_peek_metrics(&mut self, r: anyhow::Result<Vec<MetricsItem>>) {
if let Ok(list) = r {
for item in list {
self.update_item_record(item);
}
}
}
fn update_item_record(&mut self, item: MetricsItem) {
match item.record {
MetricsRecord::CounterInc(v) => self.counter_manager.increment(item.metrics_type, v),
MetricsRecord::Gauge(v) => self.gauge_manager.set(item.metrics_type, v),
MetricsRecord::HistogramRecord(v) => {
self.histogram_manager.record(&item.metrics_type, v)
}
MetricsRecord::HistogramRecords(batch_value) => self
.histogram_manager
.record_many(&item.metrics_type, &batch_value),
}
}
fn print_metrics(&mut self) {
if !self.app_sys_config.metrics_log_enable {
return;
}
let now = now_millis();
if now - self.last_log_time < (self.app_sys_config.metrics_log_interval_second - 1) * 1000 {
return;
}
self.print_sys_metrics();
self.gauge_manager.print_metrics();
self.counter_manager.print_metrics();
self.histogram_manager.print_metrics();
self.summary_manager.print_metrics();
self.last_log_time = now;
}
fn load_sys_metrics(&mut self) {
self.system.refresh_all();
if let Some(process) = self.system.process(Pid::from_u32(self.current_process_id)) {
let cpu_usage = process.cpu_usage();
let rss = process.memory() as f32 / (1024.0 * 1024.0);
let vms = process.virtual_memory() as f32 / (1024.0 * 1024.0);
let rss_usage = rss / self.total_memory * 100.0;
let running_seconds = (now_millis() - self.start_time_millis) / 1000;
self.gauge_manager
.set(MetricsKey::ProcessStartTimeSeconds, running_seconds as f32);
self.gauge_manager.set(MetricsKey::AppCpuUsage, cpu_usage);
self.gauge_manager.set(MetricsKey::AppRssMemory, rss);
self.gauge_manager.set(MetricsKey::AppVmsMemory, vms);
self.gauge_manager
.set(MetricsKey::AppMemoryUsage, rss_usage);
}
self.last_collect_time = now_millis();
}
fn print_sys_metrics(&self) {
let cpu_usage = self
.gauge_manager
.value(&MetricsKey::AppCpuUsage)
.unwrap_or_default();
let rss = self
.gauge_manager
.value(&MetricsKey::AppRssMemory)
.unwrap_or_default();
let vms = self
.gauge_manager
.value(&MetricsKey::AppVmsMemory)
.unwrap_or_default();
let rss_usage = self
.gauge_manager
.value(&MetricsKey::AppMemoryUsage)
.unwrap_or_default();
let running_seconds = self
.gauge_manager
.value(&MetricsKey::ProcessStartTimeSeconds)
.unwrap_or_default();
log::info!("[metrics_system]|already running seconds: {}s|cpu_usage: {:.2}%|rss_usage: {:.2}%|rss: {:.2}M|vms: {:.2}M|total_memory: {:.2}M|",running_seconds,&cpu_usage,&rss_usage,&rss,&vms,&self.total_memory);
}
fn after_peek_metrics(&mut self) {
self.reset_summary();
self.load_sys_metrics();
self.print_metrics();
let now = now_millis();
self.record_timeline_snapshot(now, TimelineGroupType::Least);
self.record_timeline_snapshot(now, TimelineGroupType::Minute);
self.record_timeline_snapshot(now, TimelineGroupType::Hour);
}
fn load_metrics(&mut self, ctx: &mut Context<Self>) {
let metrics_collect = self.metrics_collect.clone();
async move { Self::do_peek_metrics(metrics_collect).await }
.into_actor(self)
.map(|r, act, ctx| {
act.update_peek_metrics(r);
act.after_peek_metrics();
act.hb(ctx);
})
.spawn(ctx);
}
fn build_snapshot(&self, now_ms: u64) -> MetricsSnapshot {
MetricsSnapshot {
gauge_data_map: self.gauge_manager.data_map.clone(),
counter_data_map: self.counter_manager.data_map.clone(),
histogram_data_map: self.histogram_manager.data_map.clone(),
snapshot_time: now_ms,
}
}
fn record_timeline_snapshot(&mut self, now_ms: u64, group_type: TimelineGroupType) {
if now_ms + 200 - group_type.get_interval_millis()
> self
.metrics_timeline_manager
.get_last_record_time(&group_type)
{
self.metrics_timeline_manager
.add_record(&group_type, self.build_snapshot(now_ms));
}
}
fn hb(&mut self, ctx: &mut Context<Self>) {
ctx.run_later(
Duration::from_secs(self.app_sys_config.metrics_collect_interval_second),
|act, ctx| {
act.load_metrics(ctx);
},
);
}
fn export(&mut self) -> anyhow::Result<String> {
let mut bytes_mut = BytesMut::new();
self.counter_manager.export(&mut bytes_mut)?;
self.gauge_manager.export(&mut bytes_mut)?;
self.histogram_manager.export(&mut bytes_mut)?;
self.reset_summary();
self.summary_manager.export(&mut bytes_mut)?;
Ok(String::from_utf8(bytes_mut.to_vec())?)
}
}
impl Actor for MetricsManager {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
log::info!("MetricsManager started");
}
}
impl Inject for MetricsManager {
type Context = Context<Self>;
fn inject(
&mut self,
factory_data: FactoryData,
_factory: BeanFactory,
ctx: &mut Self::Context,
) {
self.metrics_collect = MetricsActorCollect::from_factory(&factory_data);
self.metrics_timeline_manager
.set_least_interval(self.app_sys_config.metrics_collect_interval_second);
if self.app_sys_config.metrics_enable {
log::info!(
"metrics enable! log_interval: {}s",
self.app_sys_config.metrics_log_interval_second
);
self.init(ctx);
} else {
log::info!("metrics disable!");
}
}
}
impl Handler<MetricsRequest> for MetricsManager {
type Result = anyhow::Result<MetricsResponse>;
fn handle(&mut self, msg: MetricsRequest, _ctx: &mut Self::Context) -> Self::Result {
if !self.app_sys_config.metrics_enable {
return Ok(MetricsResponse::None);
}
match msg {
MetricsRequest::Record(item) => {
self.update_item_record(item);
Ok(MetricsResponse::None)
}
MetricsRequest::BatchRecord(items) => {
for item in items {
self.update_item_record(item);
}
Ok(MetricsResponse::None)
}
MetricsRequest::Export => {
let v = self.export()?;
Ok(MetricsResponse::ExportInfo(v))
}
MetricsRequest::TimelineQuery(param) => {
let response = self.metrics_timeline_manager.query(param);
Ok(MetricsResponse::TimelineResponse(response))
}
}
}
}