instrumented/
lib.rs

1//! # Instrumented
2//!
3//! `instrumented` provides an attribute macro that enables instrumentation of
4//! functions for use with Prometheus.
5//!
6//! This crate is largely based on the [`log-derive`](https://docs.rs/log-derive/) crate, and
7//! inspired by the [`metered`](https://docs.rs/metered/) crate.
8//!
9//! To get started, add the [`instrumented_codegen::instrument`] proc macro to any function you
10//! want to instrument. Have a look in the `example` directory for a full usage.
11//!
12//! ## Configuring
13//!
14//! You can specify the global metrics prefix with the `METRICS_PREFIX` env var,
15//! and provide default labels with the `METRICS_LABELS` env var, which accepts a
16//! command separated list of `label=value` pairs. For example:
17//!
18//! ```shell
19//! METRICS_PREFIX=myapp
20//! METRICS_LABELS=app=myapp,env=prod,region=us
21//! ```
22//!
23//! ## Example
24//!
25//! ```rust
26//! extern crate instrumented;
27//! extern crate log;
28//! extern crate reqwest;
29//!
30//! use instrumented::instrument;
31//!
32//! // Logs at warn level with the `special` context.
33//! #[instrument(WARN, ctx = "special")]
34//! fn my_func() {
35//!     use std::{thread, time};
36//!     let ten_millis = time::Duration::from_millis(10);
37//!     thread::sleep(ten_millis);
38//! }
39//!
40//! #[derive(Debug)]
41//! pub struct MyError;
42//!
43//! // Logs result at info level
44//! #[instrument(INFO)]
45//! fn my_func_with_ok_result() -> Result<String, MyError> {
46//!     use std::{thread, time};
47//!     let ten_millis = time::Duration::from_millis(10);
48//!     thread::sleep(ten_millis);
49//!
50//!     Ok(String::from("hello world"))
51//! }
52//!
53//! // Logs result at debug level
54//! #[instrument(DEBUG)]
55//! fn my_func_with_err_result() -> Result<String, MyError> {
56//!     use std::{thread, time};
57//!     let ten_millis = time::Duration::from_millis(10);
58//!     thread::sleep(ten_millis);
59//!
60//!     Err(MyError)
61//! }
62//!
63//! fn main() {
64//!     let addr = "127.0.0.1:5000".to_string();
65//!     instrumented::init(&addr);
66//!
67//!     my_func();
68//!     assert_eq!(my_func_with_ok_result().is_ok(), true);
69//!     assert_eq!(my_func_with_err_result().is_err(), true);
70//!
71//!     let body = reqwest::get(&format!("http://{}/metrics", addr))
72//!         .unwrap()
73//!         .text()
74//!         .unwrap();
75//!
76//!     println!("{}", body);
77//! }
78//! ```
79#[macro_use]
80extern crate lazy_static;
81#[macro_use]
82extern crate log;
83extern crate hyper;
84#[allow(unused_imports)]
85#[macro_use]
86extern crate instrumented_codegen;
87
88/// Codegen crate
89pub use instrumented_codegen::instrument;
90
91/// `rust-prometheus` crate
92pub mod prometheus {
93    extern crate prometheus;
94    /// `rust-prometheus` crate
95    pub use self::prometheus::*;
96}
97
98use hyper::http::StatusCode;
99use hyper::rt::Future;
100use hyper::service::service_fn_ok;
101use hyper::{Body, Request, Response, Server};
102
103#[cfg(all(target_os = "linux"))]
104fn register_default_process_collector(
105    reg: &crate::prometheus::Registry,
106) -> crate::prometheus::Result<()> {
107    use crate::prometheus::process_collector::ProcessCollector;
108
109    let pc = ProcessCollector::for_self();
110    reg.register(Box::new(pc))
111}
112
113lazy_static! {
114    static ref INSTRUMENTED_REGISTRY: ::prometheus::Registry = {
115        use std::collections::HashMap;
116
117        let prefix =  match std::env::var("METRICS_PREFIX")  {
118            Ok(value) => Some(value.to_string()),
119            Err(_) => None,
120        };
121        let labels =  match std::env::var("METRICS_LABELS")  {
122            Ok(value) => {
123                let mut labels = HashMap::new();
124                value.split(',').for_each(|s| {let v: Vec<&str> = s.splitn(2, '=').collect();
125                if v.len() ==2 {
126                    labels.insert(v[0].to_owned(), v[1].to_owned());
127                }});
128                Some(labels)
129            },
130            Err(_) => None,
131        };
132
133        #[allow(clippy::let_and_return)]
134        let reg = ::prometheus::Registry::new_custom(prefix, labels).unwrap();
135
136        // Register a default process collector.
137        #[cfg(all(target_os = "linux"))]
138        register_default_process_collector(&reg).unwrap();
139
140        reg
141    };
142    static ref FUNC_CALLED: prometheus::IntCounterVec = {
143        let counter_opts = prometheus::Opts::new(
144            "function_called_total",
145            "Number of times a function was called",
146        );
147        let counter = prometheus::IntCounterVec::new(counter_opts, &["type","name","ctx"]).unwrap();
148
149        INSTRUMENTED_REGISTRY
150            .register(Box::new(counter.clone())).unwrap();
151
152        counter
153    };
154    static ref FUNC_ERRORS: prometheus::IntCounterVec = {
155        let counter_opts = prometheus::Opts::new(
156            "function_error_total",
157            "Number of times the result of a function was an error",
158        );
159        let counter = prometheus::IntCounterVec::new(counter_opts, &["type","name","ctx","err"]).unwrap();
160
161        INSTRUMENTED_REGISTRY
162            .register(Box::new(counter.clone())).unwrap();
163
164        counter
165    };
166    static ref FUNC_TIMER: prometheus::HistogramVec = {
167        let histogram_opts = prometheus::HistogramOpts::new(
168            "function_time_seconds",
169            "Histogram of function call times observed",
170        );
171        let histogram = prometheus::HistogramVec::new(histogram_opts, &["type","name","ctx"]).unwrap();
172
173        INSTRUMENTED_REGISTRY
174            .register(Box::new(histogram.clone())).unwrap();
175
176        histogram
177    };
178    static ref FUNC_INFLIGHT: prometheus::IntGaugeVec = {
179        let gauge_opts = prometheus::Opts::new(
180            "function_calls_inflight_total",
181            "Number of function calls currently in flight",
182        );
183        let gauge = prometheus::IntGaugeVec::new(gauge_opts, &["type","name","ctx"]).unwrap();
184
185        INSTRUMENTED_REGISTRY
186            .register(Box::new(gauge.clone())).unwrap();
187
188        gauge
189    };
190}
191
192#[doc(hidden)]
193pub fn inc_called_counter_for(name: &'static str, ctx: &'static str) {
194    FUNC_CALLED
195        .with_label_values(&["func_call", name, ctx])
196        .inc();
197}
198
199#[doc(hidden)]
200pub fn inc_error_counter_for(name: &'static str, ctx: &'static str, err: String) {
201    FUNC_ERRORS
202        .with_label_values(&["func_call", name, ctx, &err])
203        .inc();
204}
205
206#[doc(hidden)]
207pub fn get_timer_for(name: &'static str, ctx: &'static str) -> prometheus::HistogramTimer {
208    FUNC_TIMER
209        .with_label_values(&["func_call", name, ctx])
210        .start_timer()
211}
212
213#[doc(hidden)]
214pub fn inc_inflight_for(name: &'static str, ctx: &'static str) {
215    FUNC_INFLIGHT
216        .with_label_values(&["func_call", name, ctx])
217        .inc();
218}
219
220#[doc(hidden)]
221pub fn dec_inflight_for(name: &'static str, ctx: &'static str) {
222    FUNC_INFLIGHT
223        .with_label_values(&["func_call", name, ctx])
224        .dec();
225}
226
227/// Initializes the metrics context, and starts an HTTP server
228/// to serve metrics.
229pub fn init(addr: &str) {
230    let parsed_addr = addr.parse().unwrap();
231    let server = Server::bind(&parsed_addr)
232        .serve(|| {
233            // This is the `Service` that will handle the connection.
234            // `service_fn_ok` is a helper to convert a function that
235            // returns a Response into a `Service`.
236            service_fn_ok(move |req: Request<Body>| {
237                use crate::prometheus::*;
238                if req.uri().path() == "/metrics" {
239                    let metric_families = INSTRUMENTED_REGISTRY.gather();
240                    let mut buffer = vec![];
241                    let encoder = TextEncoder::new();
242                    encoder.encode(&metric_families, &mut buffer).unwrap();
243
244                    Response::builder()
245                        .status(StatusCode::OK)
246                        .header("Content-Type", encoder.format_type())
247                        .body(Body::from(buffer))
248                        .expect("Error constructing response")
249                } else {
250                    Response::builder()
251                        .status(StatusCode::NOT_FOUND)
252                        .body(Body::from("Not found."))
253                        .expect("Error constructing response")
254                }
255            })
256        })
257        .map_err(|e| error!("server error: {}", e));
258
259    info!("Exporting metrics at http://{}/metrics", addr);
260
261    let mut rt = tokio::runtime::Builder::new()
262        .core_threads(1) // one thread is sufficient
263        .build()
264        .expect("Unable to build metrics exporter tokio runtime");
265
266    std::thread::spawn(move || {
267        rt.spawn(server);
268        rt.shutdown_on_idle().wait().unwrap();
269    });
270}
271
272/// Register a collector with the global registry.
273pub fn register(c: Box<dyn::prometheus::core::Collector>) -> ::prometheus::Result<()> {
274    INSTRUMENTED_REGISTRY.register(c)
275}