1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
/*! An easy-to-use prometheus-compatible metrics library # Writing metrics A "metric" is a named value of type `f64`. There is a single global set of metrics; you can update it from any thread. ``` use epimetheus::metric; metric!(foobar).set(12.3); metric!(foobar).add(0.7); ``` If you increment a metric which has never been set, it is considered to start from zero. ``` # use epimetheus::metric; metric!(barqux).add(6.5); // now barqux = 6.5 ``` ## Labels The base part of the name is fixed statically at at compile-time. However, a metric's name may also include "labels", which are dynamic. ``` # use epimetheus::metric; let user_id = 7; metric!(login_attempts{user=user_id}).add(1.0); ``` The label values can be anything which implements `Display`. ``` # use epimetheus::metric; # let user_id = 0; # let passwd = 0; # let try_log_in = |_, _| 0; // enum LoginResult { Success, BadUsername, BadPassword } // impl Display for LoginResult { ... } let result = try_log_in(user_id, passwd); metric!(login_attempts{user=user_id, result=result}).add(1.0); ``` Labels can be useful, but they come at a performance cost (see README). # Seeing your metrics ## ...via a function call You can call `query()` to see the current value of the metrics: ``` # use epimetheus::metric; # metric!(foobar).set(12.3); # metric!(foobar).add(0.7); # metric!(barqux).set(6.5); # metric!(login_attempts{result="Success",user=7}).set(1.); # metric!(login_attempts{user=7}).set(1.); let mut metrics = epimetheus::query(); assert_eq!(metrics.next(), Some(("barqux".to_string(), 6.5))); assert_eq!(metrics.next(), Some(("epimetheus_total_flushes".to_string(), 1.))); assert_eq!(metrics.next(), Some(("epimetheus_total_updates".to_string(), 5.))); assert_eq!(metrics.next(), Some(("foobar".to_string(), 13.))); assert_eq!(metrics.next(), Some(("login_attempts{result=\"Success\",user=\"7\"}".to_string(), 1.))); assert_eq!(metrics.next(), Some(("login_attempts{user=\"7\"}".to_string(), 1.))); ``` Note the "epimetheus_*" lines: these are metrics exposed by epimetheus itself. ## ...via HTTP If you want to view the metrics externally, you should call `spawn_http_server()`. This will spawn a new thread which will serve metrics over HTTP. ``` epimetheus::spawn_http_server(); ``` Connect to the server to see the current values of all metrics. Metrics appear in the output after being updated for the first time. ```text $ curl localhost:9898 barqux 6.5 epimetheus_total_flushes 2 epimetheus_total_updates 5 foobar 13 login_attempts{result="Success",user="7"} 1 login_attempts{user="7"} 1 ``` */ use crossbeam_channel::{Receiver, Sender}; use log::*; use once_cell::sync::Lazy; use std::collections::BTreeMap; use std::io::prelude::*; use std::net::{Ipv4Addr, TcpListener, TcpStream}; use std::sync::Mutex; use std::time::Duration; use std::{fmt, thread}; struct State { chan: Sender<(Metric, Action)>, tracker: Mutex<Tracker>, } static STATE: Lazy<State> = Lazy::new(|| { let (tx, rx) = crossbeam_channel::unbounded(); let state = State { chan: tx, tracker: Mutex::new(Tracker { metrics: BTreeMap::default(), chan: rx, }), }; // We want to ensure that the channel is regularly drained, even when // there are no new connections coming in. (Otherwise, we'd have a // memory leak and - worse - the metric update latency would suffer.) // Therefore we spawn a thread which regularly drains the channel. thread::Builder::new() .name("epimetheus-drainer".into()) .spawn(move || loop { // We sleep first to avoid recursing thread::sleep(Duration::from_secs(20)); STATE.tracker.lock().unwrap().update(); }) .expect("Failed to spawn drainer thread"); state }); /// A named metric; it has a associated global mutable `f64` value. /// /// You can create these by hand, but you might find it more convenient to /// use the `metric!()` macro. pub struct Metric { pub name: &'static str, pub labels: Labels, } type Labels = Vec<(&'static str, Box<dyn fmt::Display + Send>)>; enum Action { Inc(f64), Set(f64), Min(f64), Max(f64), } impl Metric { /// Set the metric to the specified value. #[inline] pub fn set(self, x: f64) { send_chan((self, Action::Set(x))); } /// Increment the metric by the specified amount. #[inline] pub fn add(self, x: f64) { send_chan((self, Action::Inc(x))); } /// Set the metric to the specified value if it is smaller than the /// current value. #[inline] pub fn min(self, x: f64) { send_chan((self, Action::Min(x))); } /// Set the metric to the specified value if it is larger than the /// current value. #[inline] pub fn max(self, x: f64) { send_chan((self, Action::Max(x))); } } #[inline] fn send_chan(x: (Metric, Action)) { // If there's no HTTP thread, drop the update // If the HTTP thread dies then the Receiver will be dropped and // send() will return an error. This is fine, so we ignore it. let _ = STATE.chan.send(x); } /// Refer to a metric. #[macro_export] macro_rules! metric { ($name:ident) => { $crate::Metric { name: stringify!($name), labels: Vec::new(), } }; ($name:ident{$($key:ident = $val:expr),*}) => { $crate::Metric { name: stringify!($name), labels: vec![$((stringify!($key), Box::new($val))),*], } }; } #[derive(PartialEq, Eq, PartialOrd, Ord, Clone)] struct LabelsDisplay(BTreeMap<&'static str, String>); impl LabelsDisplay { fn new(labels: Labels) -> LabelsDisplay { let labels = labels .into_iter() .map(|(k, v)| (k, v.to_string())) .collect::<BTreeMap<_, _>>(); LabelsDisplay(labels) } } impl fmt::Display for LabelsDisplay { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut labels = self.0.iter(); if let Some(head) = labels.next() { f.write_str("{")?; f.write_str(head.0)?; f.write_str("=")?; fmt::Debug::fmt(head.1, f)?; for (k, v) in labels { f.write_str(",")?; f.write_str(k)?; f.write_str("=")?; fmt::Debug::fmt(v, f)?; } f.write_str("}")?; } Ok(()) } } /// Tracks the current state of the metrics struct Tracker { /// Stores the current value of all the metrics in a map. (We use a /// BTreeMap so the metrics are nicely sorted when we print them.) metrics: BTreeMap<(&'static str, LabelsDisplay), f64>, chan: Receiver<(Metric, Action)>, } impl Tracker { fn update(&mut self) { let mut n = 0.; for (metric, action) in self.chan.try_iter() { let labels = LabelsDisplay::new(metric.labels); let entry = self.metrics.entry((metric.name, labels)).or_insert(0.0); match action { Action::Inc(x) => *entry += x, Action::Set(x) => *entry = x, Action::Min(x) => *entry = entry.min(x), Action::Max(x) => *entry = entry.max(x), } n += 1.; } let total_updates = ("epimetheus_total_updates", LabelsDisplay::new(vec![])); let total_flushes = ("epimetheus_total_flushes", LabelsDisplay::new(vec![])); *self.metrics.entry(total_updates).or_insert(0.) += n; *self.metrics.entry(total_flushes).or_insert(0.) += 1.; } } /// Get the current state of the metrics. /// /// ``` /// use epimetheus::metric; /// metric!(a_metric).set(42.0); /// assert_eq!( /// epimetheus::query().next(), /// Some(("a_metric".to_string(), 42.0)) /// ); /// ``` pub fn query() -> impl Iterator<Item = (String, f64)> { let mut tracker = STATE.tracker.lock().unwrap(); tracker.update(); let metrics = tracker.metrics.clone(); metrics .into_iter() .map(|((name, labels), val)| (format!("{}{}", name, labels), val)) } /// Spawn a thread which serves metrics over HTTP. /// /// By default the HTTP server runs on port 9898, but you can change this by /// setting the `RUST_METRICS_PORT` environment variable. Tip: If you want to /// specify the metrics port in your application itself, you can do so like this: /// /// ``` /// std::env::set_var("RUST_METRICS_PORT", "1234"); /// epimetheus::spawn_http_server(); /// ``` pub fn spawn_http_server() { let port = std::env::var("RUST_METRICS_PORT") .ok() .and_then(|x| x.parse::<u16>().ok()) .unwrap_or(9898); if let Err(e) = try_spawn_http_server_on(port) { warn!("HTTP thread failed to start: {}", e); } } /// Bind a socket and listen for incoming connections from HTTP clients. /// When we get a connection, we: /// /// 1. drain any updates from the channel and apply them to the global /// metrics map; then /// 2. render the map to prometheus exposition format and send it to /// the client. fn try_spawn_http_server_on(port: u16) -> std::io::Result<()> { let sock = TcpListener::bind((Ipv4Addr::LOCALHOST, port))?; info!("Listening on port {}", port); std::thread::Builder::new() .name("epimetheus-http".into()) .spawn(move || loop { for conn in sock.incoming() { let mut tracker = STATE.tracker.lock().unwrap(); tracker.update(); if let Err(e) = conn.and_then(|conn| handle_http_client(conn, &tracker.metrics)) { warn!("{}", e); } } })?; Ok(()) } /// This is the world's simplest HTTP implementation. It completely /// ignores the request, unconditionally sending the same response. /// This response comes with no headers or anything - just a body. fn handle_http_client( mut conn: TcpStream, metrics: &BTreeMap<(&'static str, LabelsDisplay), f64>, ) -> Result<(), std::io::Error> { // We don't care about the request, but some HTTP clients get // upset if you don't at least read it. Unfortunate. let mut buf = [0; 128]; loop { // conn is non-blocking, so we need to retry if we get EAGAIN match conn.read(&mut buf) { Ok(_) => break, Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => (), Err(e) => return Err(e), } } writeln!(conn, "HTTP/1.1 200 OK\r\n")?; for ((name, labels), val) in metrics { writeln!(conn, "{}{} {}", name, labels, val)?; } Ok(()) } #[cfg(test)] mod tests { use crate::*; #[test] fn test_non_http() -> Result<(), Box<dyn std::error::Error>> { metric!(foo).set(1.0); metric!(bar).add(1.0); metric!(bar).add(2.0); assert_eq!(query().find(|(k, _)| k == "foo").map(|x| x.1), Some(1.0)); assert_eq!(query().find(|(k, _)| k == "bar").map(|x| x.1), Some(3.0)); assert_eq!(query().find(|(k, _)| k == "qux"), None); metric!(bar).max(1.5); assert_eq!(query().find(|(k, _)| k == "bar").map(|x| x.1), Some(3.0)); metric!(bar).min(1.5); assert_eq!(query().find(|(k, _)| k == "bar").map(|x| x.1), Some(1.5)); Ok(()) } }