use crossbeam_channel::{Receiver, Sender};
use tracing::*;
use once_cell::sync::Lazy;
use std::collections::BTreeMap;
use std::io::prelude::*;
use std::net::{Ipv4Addr, TcpListener, TcpStream};
use std::sync::Mutex;
use std::time::Duration;
use std::{fmt, thread};
struct State {
chan: Sender<(Metric, Action)>,
tracker: Mutex<Tracker>,
}
static STATE: Lazy<State> = Lazy::new(|| {
let (tx, rx) = crossbeam_channel::unbounded();
let state = State {
chan: tx,
tracker: Mutex::new(Tracker {
metrics: BTreeMap::default(),
chan: rx,
}),
};
thread::Builder::new()
.name("epimetheus-drainer".into())
.spawn(move || loop {
thread::sleep(Duration::from_secs(20));
STATE.tracker.lock().unwrap().update();
})
.expect("Failed to spawn drainer thread");
state
});
pub struct Metric {
pub name: &'static str,
pub labels: Labels,
}
type Labels = Vec<(&'static str, Box<dyn fmt::Display + Send>)>;
enum Action {
Inc(f64),
Set(f64),
Min(f64),
Max(f64),
}
impl Metric {
#[inline]
pub fn set(self, x: f64) {
send_chan((self, Action::Set(x)));
}
#[inline]
pub fn add(self, x: f64) {
send_chan((self, Action::Inc(x)));
}
#[inline]
pub fn min(self, x: f64) {
send_chan((self, Action::Min(x)));
}
#[inline]
pub fn max(self, x: f64) {
send_chan((self, Action::Max(x)));
}
}
#[inline]
fn send_chan(x: (Metric, Action)) {
let _ = STATE.chan.send(x);
}
#[macro_export]
macro_rules! metric {
($name:ident) => {
$crate::Metric {
name: stringify!($name),
labels: Vec::new(),
}
};
($name:ident{$($key:ident = $val:expr),*}) => {
$crate::Metric {
name: stringify!($name),
labels: vec![$((stringify!($key), Box::new($val))),*],
}
};
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
struct LabelsDisplay(BTreeMap<&'static str, String>);
impl LabelsDisplay {
fn new(labels: Labels) -> LabelsDisplay {
let labels = labels
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect::<BTreeMap<_, _>>();
LabelsDisplay(labels)
}
}
impl fmt::Display for LabelsDisplay {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut labels = self.0.iter();
if let Some(head) = labels.next() {
f.write_str("{")?;
f.write_str(head.0)?;
f.write_str("=")?;
fmt::Debug::fmt(head.1, f)?;
for (k, v) in labels {
f.write_str(",")?;
f.write_str(k)?;
f.write_str("=")?;
fmt::Debug::fmt(v, f)?;
}
f.write_str("}")?;
}
Ok(())
}
}
struct Tracker {
metrics: BTreeMap<(&'static str, LabelsDisplay), f64>,
chan: Receiver<(Metric, Action)>,
}
impl Tracker {
fn update(&mut self) {
let mut n = 0.;
for (metric, action) in self.chan.try_iter() {
let labels = LabelsDisplay::new(metric.labels);
let entry = self.metrics.entry((metric.name, labels)).or_insert(0.0);
match action {
Action::Inc(x) => *entry += x,
Action::Set(x) => *entry = x,
Action::Min(x) => *entry = entry.min(x),
Action::Max(x) => *entry = entry.max(x),
}
n += 1.;
}
let total_updates = ("epimetheus_total_updates", LabelsDisplay::new(vec![]));
let total_flushes = ("epimetheus_total_flushes", LabelsDisplay::new(vec![]));
*self.metrics.entry(total_updates).or_insert(0.) += n;
*self.metrics.entry(total_flushes).or_insert(0.) += 1.;
}
}
pub fn query() -> impl Iterator<Item = (String, f64)> {
let mut tracker = STATE.tracker.lock().unwrap();
tracker.update();
let metrics = tracker.metrics.clone();
metrics
.into_iter()
.map(|((name, labels), val)| (format!("{}{}", name, labels), val))
}
pub fn spawn_http_server() {
let port = std::env::var("RUST_METRICS_PORT")
.ok()
.and_then(|x| x.parse::<u16>().ok())
.unwrap_or(9898);
if let Err(e) = try_spawn_http_server_on(port) {
warn!("HTTP thread failed to start: {}", e);
}
}
fn try_spawn_http_server_on(port: u16) -> std::io::Result<()> {
let sock = TcpListener::bind((Ipv4Addr::LOCALHOST, port))?;
info!("Listening on port {}", port);
std::thread::Builder::new()
.name("epimetheus-http".into())
.spawn(move || loop {
for conn in sock.incoming() {
let mut tracker = STATE.tracker.lock().unwrap();
tracker.update();
if let Err(e) = conn.and_then(|conn| handle_http_client(conn, &tracker.metrics)) {
warn!("{}", e);
}
}
})?;
Ok(())
}
fn handle_http_client(
conn: TcpStream,
metrics: &BTreeMap<(&'static str, LabelsDisplay), f64>,
) -> Result<(), std::io::Error> {
let mut conn = std::io::BufReader::with_capacity(128, conn);
let mut progress = 0;
for b in std::io::Read::by_ref(&mut conn).bytes() {
match b {
Ok(b) => match progress {
0 if b == b'\r' => progress = 1,
1 if b == b'\n' => progress = 2,
2 if b == b'\r' => progress = 3,
3 if b == b'\n' => break,
_ => progress = 0,
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => (),
Err(e) => return Err(e),
}
}
let mut conn = conn.into_inner();
writeln!(conn, "HTTP/1.1 200 OK\r\n")?;
for ((name, labels), val) in metrics {
writeln!(conn, "{}{} {}", name, labels, val)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use crate::*;
#[test]
fn test_non_http() -> Result<(), Box<dyn std::error::Error>> {
metric!(foo).set(1.0);
metric!(bar).add(1.0);
metric!(bar).add(2.0);
assert_eq!(query().find(|(k, _)| k == "foo").map(|x| x.1), Some(1.0));
assert_eq!(query().find(|(k, _)| k == "bar").map(|x| x.1), Some(3.0));
assert_eq!(query().find(|(k, _)| k == "qux"), None);
metric!(bar).max(1.5);
assert_eq!(query().find(|(k, _)| k == "bar").map(|x| x.1), Some(3.0));
metric!(bar).min(1.5);
assert_eq!(query().find(|(k, _)| k == "bar").map(|x| x.1), Some(1.5));
Ok(())
}
}