libaster/
metrics.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
pub mod slowlog;
pub mod tracker;

pub use tracker::Tracker;

use crate::com::AsError;
use crate::ASTER_VERSION as VERSION;

use std::thread;
use std::time::Duration;

use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use prometheus::{
    self, Encoder, Gauge, GaugeVec, HistogramVec, IntCounter, IntCounterVec, TextEncoder,
};
use sysinfo::{ProcessExt, SystemExt};

lazy_static! {
    static ref ASTER_FRONT_CONNECTIONS: GaugeVec = {
        let opt = opts!(
            "aster_front_connection",
            "each front nodes connections gauge"
        );
        register_gauge_vec!(opt, &["cluster"]).unwrap()
    };
    static ref ASTER_FRONT_INCR: IntCounterVec = {
        let opt = opts!(
            "aster_front_connection_incr",
            "each front nodes connections gauge"
        );
        register_int_counter_vec!(opt, &["cluster"]).unwrap()
    };
    static ref ASTER_VERSION: GaugeVec = {
        let opt = opts!("aster_version", "aster current running version");
        register_gauge_vec!(opt, &["version"]).unwrap()
    };
    static ref ASTER_MEMORY: Gauge = {
        let opt = opts!("aster_memory_usage", "aster current memory usage");
        register_gauge!(opt).unwrap()
    };
    static ref ASTER_CPU: Gauge = {
        let opt = opts!("aster_cpu_usage", "aster current cpu usage");
        register_gauge!(opt).unwrap()
    };
    static ref ASTER_THREADS: IntCounter = {
        let opt = opts!("aster_thread_count", "aster thread count counter");
        register_int_counter!(opt).unwrap()
    };
    static ref ASTER_GLOBAL_ERROR: IntCounter = {
        let opt = opts!("aster_global_error", "aster global error counter");
        register_int_counter!(opt).unwrap()
    };
    static ref ASTER_TOTAL_TIMER: HistogramVec = {
        register_histogram_vec!(
            "aster_total_timer",
            "set up each cluster command proxy total timer",
            &["cluster"],
            vec![1_000.0, 10_000.0, 40_000.0, 100_000.0, 200_000.0]
        )
        .unwrap()
    };
    static ref ASTER_REMOTE_TIMER: HistogramVec = {
        register_histogram_vec!(
            "aster_remote_timer",
            "set up each cluster command proxy remote timer",
            &["cluster"],
            vec![1_000.0, 10_000.0, 100_000.0]
        )
        .unwrap()
    };
}

pub fn front_conn_incr(cluster: &str) {
    ASTER_FRONT_INCR.with_label_values(&[cluster]).inc();
    ASTER_FRONT_CONNECTIONS.with_label_values(&[cluster]).inc()
}

pub fn front_conn_decr(cluster: &str) {
    ASTER_FRONT_CONNECTIONS.with_label_values(&[cluster]).dec()
}

pub fn global_error_incr() {
    ASTER_GLOBAL_ERROR.inc();
}

pub fn remote_tracker(cluster: &str) -> Tracker {
    Tracker::new(ASTER_REMOTE_TIMER.with_label_values(&[cluster]))
}

pub fn total_tracker(cluster: &str) -> Tracker {
    Tracker::new(ASTER_TOTAL_TIMER.with_label_values(&[cluster]))
}

fn show_metrics() -> impl Responder {
    let encoder = TextEncoder::new();
    let mut buffer = vec![];
    let metric_familys = prometheus::gather();
    encoder.encode(&metric_familys[..], &mut buffer).unwrap();
    HttpResponse::Ok().body(buffer)
}

pub fn thread_incr() {
    ASTER_THREADS.inc();
}

pub fn measure_system() -> Result<(), AsError> {
    // register global thread pool with only one thread to reduce thread number
    rayon::ThreadPoolBuilder::new()
        .num_threads(1)
        .build_global()
        .expect("rayon thread register failed");

    thread_incr();
    let pid = match sysinfo::get_current_pid() {
        Ok(pid) => pid,
        Err(err) => {
            warn!("fail get pid of current aster due {}", err);
            return Err(AsError::SystemError);
        }
    };

    let sleep_interval = Duration::from_secs(30); // 30s to sleep;
    let mut system = sysinfo::System::new();
    system.refresh_all();
    loop {
        // First we update all information of our system struct.
        if !system.refresh_process(pid) {
            return Ok(());
        }
        if let Some(process) = system.get_process(pid) {
            let cpu_usage = process.cpu_usage() as f64;
            let memory_usage = process.memory() as f64;
            ASTER_MEMORY.set(memory_usage);
            ASTER_CPU.set(cpu_usage);
            thread::sleep(sleep_interval);
        } else {
            return Ok(());
        }
    }
}

pub fn init(port: usize) -> Result<(), AsError> {
    ASTER_VERSION.with_label_values(&[VERSION]).set(1.0);
    thread_incr();
    let addr = format!("0.0.0.0:{}", port);
    info!("listen http metrics port in addr {}", port);
    HttpServer::new(|| App::new().route("/metrics", web::get().to(show_metrics)))
        .shutdown_timeout(3)
        .disable_signals()
        .workers(1)
        .bind(&addr)?
        .run()?;
    Ok(())
}