epimetheus 0.8.0

An easy-to-use prometheus-compatible metrics framework
Documentation
/*! An easy-to-use prometheus-compatible metrics library

# Writing metrics

A "metric" is a named value of type `f64`.  There is a single global set of
metrics; you can update it from any thread.

```
use epimetheus::metric;

metric!(foobar).set(12.3);
metric!(foobar).add(0.7);
```

If you increment a metric which has never been set, it is considered to
start from zero.

```
# use epimetheus::metric;
metric!(barqux).add(6.5);
// now barqux = 6.5
```

## Labels

The base part of the name is fixed statically at at compile-time.  However,
a metric's name may also include "labels", which are dynamic.

```
# use epimetheus::metric;
let user_id = 7;
metric!(login_attempts{user=user_id}).add(1.0);
```

The label values can be anything which implements `Display`.

```
# use epimetheus::metric;
# let user_id = 0;
# let passwd = 0;
# let try_log_in = |_, _| 0;
// enum LoginResult { Success, BadUsername, BadPassword }
// impl Display for LoginResult { ... }

let result = try_log_in(user_id, passwd);
metric!(login_attempts{user=user_id, result=result}).add(1.0);
```

Labels can be useful, but they come at a performance cost (see README).

# Seeing your metrics

## ...via a function call

You can call `query()` to see the current value of the metrics:

```
# use epimetheus::metric;
# metric!(foobar).set(12.3);
# metric!(foobar).add(0.7);
# metric!(barqux).set(6.5);
# metric!(login_attempts{result="Success",user=7}).set(1.);
# metric!(login_attempts{user=7}).set(1.);
let mut metrics = epimetheus::query();
assert_eq!(metrics.next(), Some(("barqux".to_string(), 6.5)));
assert_eq!(metrics.next(), Some(("epimetheus_total_flushes".to_string(), 1.)));
assert_eq!(metrics.next(), Some(("epimetheus_total_updates".to_string(), 5.)));
assert_eq!(metrics.next(), Some(("foobar".to_string(), 13.)));
assert_eq!(metrics.next(), Some(("login_attempts{result=\"Success\",user=\"7\"}".to_string(), 1.)));
assert_eq!(metrics.next(), Some(("login_attempts{user=\"7\"}".to_string(), 1.)));
```

Note the "epimetheus_*" lines: these are metrics exposed by epimetheus itself.

## ...via HTTP

If you want to view the metrics externally, you should call
`spawn_http_server()`.  This will spawn a new thread which will serve metrics
over HTTP.

```
epimetheus::spawn_http_server();
```

Connect to the server to see the current values of all metrics.  Metrics appear
in the output after being updated for the first time.

```text
$ curl localhost:9898
barqux 6.5
epimetheus_total_flushes 2
epimetheus_total_updates 5
foobar 13
login_attempts{result="Success",user="7"} 1
login_attempts{user="7"} 1
```

*/

use crossbeam_channel::{Receiver, Sender};
use tracing::*;
use once_cell::sync::Lazy;
use std::collections::BTreeMap;
use std::io::prelude::*;
use std::net::{Ipv4Addr, TcpListener, TcpStream};
use std::sync::Mutex;
use std::time::Duration;
use std::{fmt, thread};

struct State {
    chan: Sender<(Metric, Action)>,
    tracker: Mutex<Tracker>,
}

static STATE: Lazy<State> = Lazy::new(|| {
    let (tx, rx) = crossbeam_channel::unbounded();
    let state = State {
        chan: tx,
        tracker: Mutex::new(Tracker {
            metrics: BTreeMap::default(),
            chan: rx,
        }),
    };
    // We want to ensure that the channel is regularly drained, even when
    // there are no new connections coming in.  (Otherwise, we'd have a
    // memory leak and - worse - the metric update latency would suffer.)
    // Therefore we spawn a thread which regularly drains the channel.
    thread::Builder::new()
        .name("epimetheus-drainer".into())
        .spawn(move || loop {
            // We sleep first to avoid recursing
            thread::sleep(Duration::from_secs(20));
            STATE.tracker.lock().unwrap().update();
        })
        .expect("Failed to spawn drainer thread");
    state
});

/// A named metric;  it has a associated global mutable `f64` value.
///
/// You can create these by hand, but you might find it more convenient to
/// use the `metric!()` macro.
pub struct Metric {
    pub name: &'static str,
    pub labels: Labels,
}
type Labels = Vec<(&'static str, Box<dyn fmt::Display + Send>)>;
enum Action {
    Inc(f64),
    Set(f64),
    Min(f64),
    Max(f64),
}

impl Metric {
    /// Set the metric to the specified value.
    #[inline]
    pub fn set(self, x: f64) {
        send_chan((self, Action::Set(x)));
    }

    /// Increment the metric by the specified amount.
    #[inline]
    pub fn add(self, x: f64) {
        send_chan((self, Action::Inc(x)));
    }

    /// Set the metric to the specified value if it is smaller than the
    /// current value.
    #[inline]
    pub fn min(self, x: f64) {
        send_chan((self, Action::Min(x)));
    }

    /// Set the metric to the specified value if it is larger than the
    /// current value.
    #[inline]
    pub fn max(self, x: f64) {
        send_chan((self, Action::Max(x)));
    }
}

#[inline]
fn send_chan(x: (Metric, Action)) {
    // If there's no HTTP thread, drop the update
    // If the HTTP thread dies then the Receiver will be dropped and
    // send() will return an error.  This is fine, so we ignore it.
    let _ = STATE.chan.send(x);
}

/// Refer to a metric.
#[macro_export]
macro_rules! metric {
    ($name:ident) => {
        $crate::Metric {
            name: stringify!($name),
            labels: Vec::new(),
        }
    };
    ($name:ident{$($key:ident = $val:expr),*}) => {
        $crate::Metric {
            name: stringify!($name),
            labels: vec![$((stringify!($key), Box::new($val))),*],
        }
    };
}

#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
struct LabelsDisplay(BTreeMap<&'static str, String>);
impl LabelsDisplay {
    fn new(labels: Labels) -> LabelsDisplay {
        let labels = labels
            .into_iter()
            .map(|(k, v)| (k, v.to_string()))
            .collect::<BTreeMap<_, _>>();
        LabelsDisplay(labels)
    }
}
impl fmt::Display for LabelsDisplay {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        let mut labels = self.0.iter();
        if let Some(head) = labels.next() {
            f.write_str("{")?;
            f.write_str(head.0)?;
            f.write_str("=")?;
            fmt::Debug::fmt(head.1, f)?;
            for (k, v) in labels {
                f.write_str(",")?;
                f.write_str(k)?;
                f.write_str("=")?;
                fmt::Debug::fmt(v, f)?;
            }
            f.write_str("}")?;
        }
        Ok(())
    }
}

/// Tracks the current state of the metrics
struct Tracker {
    /// Stores the current value of all the metrics in a map.  (We use a
    /// BTreeMap so the metrics are nicely sorted when we print them.)
    metrics: BTreeMap<(&'static str, LabelsDisplay), f64>,
    chan: Receiver<(Metric, Action)>,
}

impl Tracker {
    fn update(&mut self) {
        let mut n = 0.;
        for (metric, action) in self.chan.try_iter() {
            let labels = LabelsDisplay::new(metric.labels);
            let entry = self.metrics.entry((metric.name, labels)).or_insert(0.0);
            match action {
                Action::Inc(x) => *entry += x,
                Action::Set(x) => *entry = x,
                Action::Min(x) => *entry = entry.min(x),
                Action::Max(x) => *entry = entry.max(x),
            }
            n += 1.;
        }
        let total_updates = ("epimetheus_total_updates", LabelsDisplay::new(vec![]));
        let total_flushes = ("epimetheus_total_flushes", LabelsDisplay::new(vec![]));
        *self.metrics.entry(total_updates).or_insert(0.) += n;
        *self.metrics.entry(total_flushes).or_insert(0.) += 1.;
    }
}

/// Get the current state of the metrics.
///
/// ```
/// use epimetheus::metric;
/// metric!(a_metric).set(42.0);
/// assert_eq!(
///     epimetheus::query().next(),
///     Some(("a_metric".to_string(), 42.0))
/// );
/// ```
pub fn query() -> impl Iterator<Item = (String, f64)> {
    let mut tracker = STATE.tracker.lock().unwrap();
    tracker.update();
    let metrics = tracker.metrics.clone();
    metrics
        .into_iter()
        .map(|((name, labels), val)| (format!("{}{}", name, labels), val))
}

/// Spawn a thread which serves metrics over HTTP.
///
/// By default the HTTP server runs on port 9898, but you can change this by
/// setting the `RUST_METRICS_PORT` environment variable.  Tip: If you want to
/// specify the metrics port in your application itself, you can do so like this:
///
/// ```
/// std::env::set_var("RUST_METRICS_PORT", "1234");
/// epimetheus::spawn_http_server();
/// ```
pub fn spawn_http_server() {
    let port = std::env::var("RUST_METRICS_PORT")
        .ok()
        .and_then(|x| x.parse::<u16>().ok())
        .unwrap_or(9898);
    if let Err(e) = try_spawn_http_server_on(port) {
        warn!("HTTP thread failed to start: {}", e);
    }
}

/// Bind a socket and listen for incoming connections from HTTP clients.
/// When we get a connection, we:
///
/// 1. drain any updates from the channel and apply them to the global
///    metrics map; then
/// 2. render the map to prometheus exposition format and send it to
///    the client.
fn try_spawn_http_server_on(port: u16) -> std::io::Result<()> {
    let sock = TcpListener::bind((Ipv4Addr::LOCALHOST, port))?;
    info!("Listening on port {}", port);
    std::thread::Builder::new()
        .name("epimetheus-http".into())
        .spawn(move || loop {
            for conn in sock.incoming() {
                let mut tracker = STATE.tracker.lock().unwrap();
                tracker.update();
                if let Err(e) = conn.and_then(|conn| handle_http_client(conn, &tracker.metrics)) {
                    warn!("{}", e);
                }
            }
        })?;
    Ok(())
}

/// This is the world's simplest HTTP implementation.  It completely
/// ignores the request, unconditionally sending the same response.
/// This response comes with no headers or anything - just a body.
fn handle_http_client(
    conn: TcpStream,
    metrics: &BTreeMap<(&'static str, LabelsDisplay), f64>,
) -> Result<(), std::io::Error> {
    // We don't care about the request, but some HTTP clients get upset if you
    // don't at least read it.  Unfortunate.
    //
    // HTTP clients keep connection open: to know when to stop reading, you have
    // to look for \r\n\r\n in the stream. We do this below.
    let mut conn = std::io::BufReader::with_capacity(128, conn);
    let mut progress = 0;
    for b in std::io::Read::by_ref(&mut conn).bytes() {
        match b {
            Ok(b) => match progress {
                0 if b == b'\r' => progress = 1,
                1 if b == b'\n' => progress = 2,
                2 if b == b'\r' => progress = 3,
                3 if b == b'\n' => break,
                _ => progress = 0,
            },
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => (),
            Err(e) => return Err(e),
        }
    }
    let mut conn = conn.into_inner();
    writeln!(conn, "HTTP/1.1 200 OK\r\n")?;
    for ((name, labels), val) in metrics {
        writeln!(conn, "{}{} {}", name, labels, val)?;
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use crate::*;

    #[test]
    fn test_non_http() -> Result<(), Box<dyn std::error::Error>> {
        metric!(foo).set(1.0);
        metric!(bar).add(1.0);
        metric!(bar).add(2.0);
        assert_eq!(query().find(|(k, _)| k == "foo").map(|x| x.1), Some(1.0));
        assert_eq!(query().find(|(k, _)| k == "bar").map(|x| x.1), Some(3.0));
        assert_eq!(query().find(|(k, _)| k == "qux"), None);
        metric!(bar).max(1.5);
        assert_eq!(query().find(|(k, _)| k == "bar").map(|x| x.1), Some(3.0));
        metric!(bar).min(1.5);
        assert_eq!(query().find(|(k, _)| k == "bar").map(|x| x.1), Some(1.5));
        Ok(())
    }
}