rostrum 8.0.0

An efficient implementation of Electrum Server with token support
Documentation
use anyhow::{Context, Result};

use prometheus::{
    self, Encoder, Gauge, GaugeVec, HistogramOpts, HistogramVec, IntCounter, IntCounterVec,
    IntGauge, IntGaugeVec, Registry,
};
use tiny_http::{Response, Server};

use std::net::SocketAddr;

use crate::thread::spawn;

pub struct Metrics {
    reg: Registry,
}

impl Metrics {
    pub fn new(addr: SocketAddr) -> Result<Self> {
        let reg = Registry::new();

        let result = Self { reg };
        let reg = result.reg.clone();
        spawn("metrics", move || {
            let server = Server::http(addr).unwrap();
            for request in server.incoming_requests() {
                let mut buffer = vec![];
                prometheus::TextEncoder::new()
                    .encode(&reg.gather(), &mut buffer)
                    .context("failed to encode metrics")?;
                request
                    .respond(Response::from_data(buffer))
                    .context("failed to send HTTP response")?;
            }
            Ok(())
        });
        start_process_exporter(&result);

        info!("serving Prometheus metrics on {}", addr);
        Ok(result)
    }

    /// Constructor for use in unittests
    pub fn dummy() -> Self {
        Metrics {
            reg: Registry::new(),
        }
    }

    pub fn histogram_vec(
        &self,
        name: &str,
        desc: &str,
        labels: &[&str],
        buckets: Vec<f64>,
    ) -> HistogramVec {
        let opts = HistogramOpts::new(name, desc).buckets(buckets);
        let hist = HistogramVec::new(opts, labels).unwrap();
        self.reg
            .register(Box::new(hist.clone()))
            .expect("failed to register Histogram");
        hist
    }

    pub fn gauge_float_vec(&self, opts: prometheus::Opts, labels: &[&str]) -> GaugeVec {
        let gauge = prometheus::GaugeVec::new(opts, labels).unwrap();
        self.reg
            .register(Box::new(gauge.clone()))
            .expect("failed to register Gauge");
        gauge
    }

    pub fn gauge_float(&self, opts: prometheus::Opts) -> Gauge {
        let g = Gauge::with_opts(opts).unwrap();
        self.reg.register(Box::new(g.clone())).unwrap();
        g
    }

    pub fn gauge_int_vec(&self, opts: prometheus::Opts, labels: &[&str]) -> IntGaugeVec {
        let g = IntGaugeVec::new(opts, labels).unwrap();
        self.reg.register(Box::new(g.clone())).unwrap();
        g
    }

    pub fn gauge_int(&self, opts: prometheus::Opts) -> IntGauge {
        let g = IntGauge::with_opts(opts).unwrap();
        self.reg.register(Box::new(g.clone())).unwrap();
        g
    }
    pub fn counter_int(&self, opts: prometheus::Opts) -> IntCounter {
        let c = IntCounter::with_opts(opts).unwrap();
        self.reg.register(Box::new(c.clone())).unwrap();
        c
    }

    pub fn counter_int_vec(&self, opts: prometheus::Opts, labels: &[&str]) -> IntCounterVec {
        let c = IntCounterVec::new(opts, labels).unwrap();
        self.reg.register(Box::new(c.clone())).unwrap();
        c
    }
}

#[cfg(target_os = "linux")]
struct Stats {
    utime: f64,
    rss: u64,
    fds: usize,
}

#[cfg(target_os = "linux")]
fn parse_stats() -> Result<Stats> {
    use std::fs;
    let value = fs::read_to_string("/proc/self/stat").context("failed to read /proc/self/stat")?;
    let parts: Vec<&str> = value.split_whitespace().collect();
    let page_size = page_size::get() as u64;
    let ticks_per_second = sysconf::raw::sysconf(sysconf::raw::SysconfVariable::ScClkTck)
        .expect("failed to get _SC_CLK_TCK") as f64;

    let parse_part = |index: usize, name: &str| -> Result<u64> {
        parts
            .get(index)
            .context(format!("missing {}: {:?}", name, parts))?
            .parse::<u64>()
            .context(format!("invalid {}: {:?}", name, parts))
    };

    // For details, see '/proc/[pid]/stat' section at `man 5 proc`:
    let utime = parse_part(13, "utime")? as f64 / ticks_per_second;
    let rss = parse_part(23, "rss")? * page_size;
    let fds = fs::read_dir("/proc/self/fd")
        .context("failed to read /proc/self/fd directory")?
        .count();
    Ok(Stats { utime, rss, fds })
}
#[cfg(not(target_os = "linux"))]
fn start_process_exporter(_metrics: &Metrics) {}

#[cfg(target_os = "linux")]
fn start_process_exporter(metrics: &Metrics) {
    use prometheus::Opts;
    use std::time::Duration;
    let rss = metrics.gauge_int(Opts::new(
        "rostrum_process_memory_rss",
        "Resident memory size [bytes]",
    ));
    let cpu = metrics.gauge_float_vec(
        Opts::new(
            "rostrum_process_cpu_usage",
            "CPU usage by this process [seconds]",
        ),
        &["type"],
    );
    let fds = metrics.gauge_int(Opts::new(
        "rostrum_process_open_fds",
        "# of file descriptors",
    ));

    let jemalloc_allocated = metrics.gauge_int(Opts::new(
        "rostrum_process_jemalloc_allocated",
        "# of bytes allocated by the application.",
    ));

    let jemalloc_resident = metrics.gauge_int(Opts::new(
        "rostrum_process_jemalloc_resident",
        "# of bytes in physically resident data pages mapped by the allocator",
    ));

    spawn("exporter", move || {
        use tikv_jemalloc_ctl::stats;
        let e = tikv_jemalloc_ctl::epoch::mib().unwrap();
        let allocated = stats::allocated::mib().unwrap();
        let resident = stats::resident::mib().unwrap();
        loop {
            if let Ok(stats) = parse_stats() {
                cpu.with_label_values(&["utime"]).set(stats.utime);
                rss.set(stats.rss as i64);
                fds.set(stats.fds as i64);
            }
            e.advance().unwrap();
            jemalloc_allocated.set(allocated.read().unwrap() as i64);
            jemalloc_resident.set(resident.read().unwrap() as i64);

            std::thread::sleep(Duration::from_secs(5));
        }
    });
}

pub(crate) fn default_duration_buckets() -> Vec<f64> {
    vec![
        1e-6, 2e-6, 5e-6, 1e-5, 2e-5, 5e-5, 1e-4, 2e-4, 5e-4, 1e-3, 2e-3, 5e-3, 1e-2, 2e-2, 5e-2,
        1e-1, 2e-1, 5e-1, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
    ]
}

pub(crate) fn default_size_buckets() -> Vec<f64> {
    vec![
        1.0, 2.0, 5.0, 1e1, 2e1, 5e1, 1e2, 2e2, 5e2, 1e3, 2e3, 5e3, 1e4, 2e4, 5e4, 1e5, 2e5, 5e5,
        1e6, 2e6, 5e6, 1e7,
    ]
}

pub(crate) fn observe_duration<F, T>(hist: &HistogramVec, label: &str, func: F) -> T
where
    F: FnOnce() -> T,
{
    hist.with_label_values(&[label])
        .observe_closure_duration(func)
}