use anyhow::{Context, Result};
use prometheus::{
self, Encoder, Gauge, GaugeVec, HistogramOpts, HistogramVec, IntCounter, IntCounterVec,
IntGauge, IntGaugeVec, Registry,
};
use tiny_http::{Response, Server};
use std::net::SocketAddr;
use crate::thread::spawn;
pub struct Metrics {
reg: Registry,
}
impl Metrics {
pub fn new(addr: SocketAddr) -> Result<Self> {
let reg = Registry::new();
let result = Self { reg };
let reg = result.reg.clone();
spawn("metrics", move || {
let server = Server::http(addr).unwrap();
for request in server.incoming_requests() {
let mut buffer = vec![];
prometheus::TextEncoder::new()
.encode(®.gather(), &mut buffer)
.context("failed to encode metrics")?;
request
.respond(Response::from_data(buffer))
.context("failed to send HTTP response")?;
}
Ok(())
});
start_process_exporter(&result);
info!("serving Prometheus metrics on {}", addr);
Ok(result)
}
pub fn dummy() -> Self {
Metrics {
reg: Registry::new(),
}
}
pub fn histogram_vec(
&self,
name: &str,
desc: &str,
labels: &[&str],
buckets: Vec<f64>,
) -> HistogramVec {
let opts = HistogramOpts::new(name, desc).buckets(buckets);
let hist = HistogramVec::new(opts, labels).unwrap();
self.reg
.register(Box::new(hist.clone()))
.expect("failed to register Histogram");
hist
}
pub fn gauge_float_vec(&self, opts: prometheus::Opts, labels: &[&str]) -> GaugeVec {
let gauge = prometheus::GaugeVec::new(opts, labels).unwrap();
self.reg
.register(Box::new(gauge.clone()))
.expect("failed to register Gauge");
gauge
}
pub fn gauge_float(&self, opts: prometheus::Opts) -> Gauge {
let g = Gauge::with_opts(opts).unwrap();
self.reg.register(Box::new(g.clone())).unwrap();
g
}
pub fn gauge_int_vec(&self, opts: prometheus::Opts, labels: &[&str]) -> IntGaugeVec {
let g = IntGaugeVec::new(opts, labels).unwrap();
self.reg.register(Box::new(g.clone())).unwrap();
g
}
pub fn gauge_int(&self, opts: prometheus::Opts) -> IntGauge {
let g = IntGauge::with_opts(opts).unwrap();
self.reg.register(Box::new(g.clone())).unwrap();
g
}
pub fn counter_int(&self, opts: prometheus::Opts) -> IntCounter {
let c = IntCounter::with_opts(opts).unwrap();
self.reg.register(Box::new(c.clone())).unwrap();
c
}
pub fn counter_int_vec(&self, opts: prometheus::Opts, labels: &[&str]) -> IntCounterVec {
let c = IntCounterVec::new(opts, labels).unwrap();
self.reg.register(Box::new(c.clone())).unwrap();
c
}
}
#[cfg(target_os = "linux")]
struct Stats {
utime: f64,
rss: u64,
fds: usize,
}
#[cfg(target_os = "linux")]
fn parse_stats() -> Result<Stats> {
use std::fs;
let value = fs::read_to_string("/proc/self/stat").context("failed to read /proc/self/stat")?;
let parts: Vec<&str> = value.split_whitespace().collect();
let page_size = page_size::get() as u64;
let ticks_per_second = sysconf::raw::sysconf(sysconf::raw::SysconfVariable::ScClkTck)
.expect("failed to get _SC_CLK_TCK") as f64;
let parse_part = |index: usize, name: &str| -> Result<u64> {
parts
.get(index)
.context(format!("missing {}: {:?}", name, parts))?
.parse::<u64>()
.context(format!("invalid {}: {:?}", name, parts))
};
let utime = parse_part(13, "utime")? as f64 / ticks_per_second;
let rss = parse_part(23, "rss")? * page_size;
let fds = fs::read_dir("/proc/self/fd")
.context("failed to read /proc/self/fd directory")?
.count();
Ok(Stats { utime, rss, fds })
}
#[cfg(not(target_os = "linux"))]
fn start_process_exporter(_metrics: &Metrics) {}
#[cfg(target_os = "linux")]
fn start_process_exporter(metrics: &Metrics) {
use prometheus::Opts;
use std::time::Duration;
let rss = metrics.gauge_int(Opts::new(
"rostrum_process_memory_rss",
"Resident memory size [bytes]",
));
let cpu = metrics.gauge_float_vec(
Opts::new(
"rostrum_process_cpu_usage",
"CPU usage by this process [seconds]",
),
&["type"],
);
let fds = metrics.gauge_int(Opts::new(
"rostrum_process_open_fds",
"# of file descriptors",
));
let jemalloc_allocated = metrics.gauge_int(Opts::new(
"rostrum_process_jemalloc_allocated",
"# of bytes allocated by the application.",
));
let jemalloc_resident = metrics.gauge_int(Opts::new(
"rostrum_process_jemalloc_resident",
"# of bytes in physically resident data pages mapped by the allocator",
));
spawn("exporter", move || {
use tikv_jemalloc_ctl::stats;
let e = tikv_jemalloc_ctl::epoch::mib().unwrap();
let allocated = stats::allocated::mib().unwrap();
let resident = stats::resident::mib().unwrap();
loop {
if let Ok(stats) = parse_stats() {
cpu.with_label_values(&["utime"]).set(stats.utime);
rss.set(stats.rss as i64);
fds.set(stats.fds as i64);
}
e.advance().unwrap();
jemalloc_allocated.set(allocated.read().unwrap() as i64);
jemalloc_resident.set(resident.read().unwrap() as i64);
std::thread::sleep(Duration::from_secs(5));
}
});
}
pub(crate) fn default_duration_buckets() -> Vec<f64> {
vec![
1e-6, 2e-6, 5e-6, 1e-5, 2e-5, 5e-5, 1e-4, 2e-4, 5e-4, 1e-3, 2e-3, 5e-3, 1e-2, 2e-2, 5e-2,
1e-1, 2e-1, 5e-1, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
]
}
pub(crate) fn default_size_buckets() -> Vec<f64> {
vec![
1.0, 2.0, 5.0, 1e1, 2e1, 5e1, 1e2, 2e2, 5e2, 1e3, 2e3, 5e3, 1e4, 2e4, 5e4, 1e5, 2e5, 5e5,
1e6, 2e6, 5e6, 1e7,
]
}
pub(crate) fn observe_duration<F, T>(hist: &HistogramVec, label: &str, func: F) -> T
where
F: FnOnce() -> T,
{
hist.with_label_values(&[label])
.observe_closure_duration(func)
}