#![deny(missing_docs)]
#![forbid(unsafe_code)]
#![warn(clippy::pedantic)]
#![warn(clippy::unwrap_used)]
#![warn(rust_2018_idioms, unused_lifetimes, missing_debug_implementations)]
pub use prometheus;
#[cfg(feature = "internal_metrics")]
use crate::prometheus::{
register_histogram,
register_int_counter,
register_int_gauge,
Histogram,
IntCounter,
IntGauge,
};
#[cfg(feature = "internal_metrics")]
use lazy_static::lazy_static;
#[cfg(feature = "logging")]
use log::{
error,
info,
};
use crate::prometheus::{
Encoder,
TextEncoder,
};
use crossbeam_utils::sync::WaitGroup;
use std::{
net::SocketAddr,
sync::{
atomic::{
AtomicBool,
Ordering,
},
mpsc::{
sync_channel,
Receiver,
SyncSender,
},
Arc,
Mutex,
MutexGuard,
},
thread,
time::Duration,
};
use thiserror::Error;
use tiny_http::{
Header,
Request,
Response,
Server as HTTPServer,
};
#[cfg(feature = "internal_metrics")]
lazy_static! {
static ref HTTP_COUNTER: IntCounter = register_int_counter!(
"prometheus_exporter_requests_total",
"Number of HTTP requests received."
)
.expect("can not create HTTP_COUNTER metric. this should never fail");
static ref HTTP_BODY_GAUGE: IntGauge = register_int_gauge!(
"prometheus_exporter_response_size_bytes",
"The HTTP response sizes in bytes."
)
.expect("can not create HTTP_BODY_GAUGE metric. this should never fail");
static ref HTTP_REQ_HISTOGRAM: Histogram = register_histogram!(
"prometheus_exporter_request_duration_seconds",
"The HTTP request latencies in seconds."
)
.expect("can not create HTTP_REQ_HISTOGRAM metric. this should never fail");
}
#[derive(Debug, Error)]
pub enum Error {
#[error("can not start http server: {0}")]
ServerStart(Box<dyn std::error::Error + Send + Sync + 'static>),
}
#[derive(Debug, Error)]
enum HandlerError {
#[error("can not encode metrics: {0}")]
EncodeMetrics(prometheus::Error),
#[error("can not generate response: {0}")]
Response(std::io::Error),
}
#[derive(Debug)]
pub struct Builder {
binding: SocketAddr,
}
#[derive(Debug)]
pub struct Exporter {
request_receiver: Receiver<WaitGroup>,
is_waiting: Arc<AtomicBool>,
update_lock: Arc<Mutex<()>>,
}
#[derive(Debug)]
struct Server {}
pub fn start(binding: SocketAddr) -> Result<Exporter, Error> {
Builder::new(binding).start()
}
impl Builder {
#[must_use]
pub fn new(binding: SocketAddr) -> Builder {
Self { binding }
}
pub fn start(self) -> Result<Exporter, Error> {
let (request_sender, request_receiver) = sync_channel(0);
let is_waiting = Arc::new(AtomicBool::new(false));
let update_lock = Arc::new(Mutex::new(()));
let exporter = Exporter {
request_receiver,
is_waiting: Arc::clone(&is_waiting),
update_lock: Arc::clone(&update_lock),
};
Server::start(self.binding, request_sender, is_waiting, update_lock)?;
Ok(exporter)
}
}
impl Exporter {
#[must_use]
pub fn builder(binding: SocketAddr) -> Builder {
Builder::new(binding)
}
#[must_use = "not using the guard will result in the exporter returning the prometheus data \
immediately over http"]
pub fn wait_request(&self) -> MutexGuard<'_, ()> {
self.is_waiting.store(true, Ordering::SeqCst);
let update_waitgroup = self
.request_receiver
.recv()
.expect("can not receive from request_receiver channel. this should never happen");
self.is_waiting.store(false, Ordering::SeqCst);
let guard = self
.update_lock
.lock()
.expect("poisioned mutex. should never happen");
update_waitgroup.wait();
guard
}
#[must_use = "not using the guard will result in the exporter returning the prometheus data \
immediately over http"]
pub fn wait_duration(&self, duration: Duration) -> MutexGuard<'_, ()> {
thread::sleep(duration);
self.update_lock
.lock()
.expect("poisioned mutex. should never happen")
}
}
impl Server {
fn start(
binding: SocketAddr,
request_sender: SyncSender<WaitGroup>,
is_waiting: Arc<AtomicBool>,
update_lock: Arc<Mutex<()>>,
) -> Result<(), Error> {
let server = HTTPServer::http(&binding).map_err(Error::ServerStart)?;
thread::spawn(move || {
#[cfg(feature = "logging")]
info!("exporting metrics to http://{}/metrics", binding);
let encoder = TextEncoder::new();
for request in server.incoming_requests() {
if let Err(err) = match request.url() {
"/metrics" => Self::handler_metrics(
request,
&encoder,
&request_sender,
&is_waiting,
&update_lock,
),
_ => Self::handler_redirect(request),
} {
#[cfg(feature = "logging")]
error!("{}", err);
drop(err)
}
}
});
Ok(())
}
fn handler_metrics(
request: Request,
encoder: &TextEncoder,
request_sender: &SyncSender<WaitGroup>,
is_waiting: &Arc<AtomicBool>,
update_lock: &Arc<Mutex<()>>,
) -> Result<(), HandlerError> {
#[cfg(feature = "internal_metrics")]
HTTP_COUNTER.inc();
#[cfg(feature = "internal_metrics")]
let _timer = HTTP_REQ_HISTOGRAM.start_timer();
if is_waiting.load(Ordering::SeqCst) {
let wg = WaitGroup::new();
request_sender
.send(wg.clone())
.expect("can not send to request_sender. this should never happen");
wg.wait();
}
let _lock = update_lock
.lock()
.expect("poisioned mutex. should never happen");
Self::process_request(request, encoder)
}
fn process_request(request: Request, encoder: &TextEncoder) -> Result<(), HandlerError> {
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder
.encode(&metric_families, &mut buffer)
.map_err(HandlerError::EncodeMetrics)?;
#[cfg(feature = "internal_metrics")]
HTTP_BODY_GAUGE.set(buffer.len() as i64);
let response = Response::from_data(buffer);
request.respond(response).map_err(HandlerError::Response)?;
Ok(())
}
fn handler_redirect(request: Request) -> Result<(), HandlerError> {
let response = Response::from_string("try /metrics for metrics\n".to_string())
.with_status_code(301)
.with_header(Header {
field: "Location"
.parse()
.expect("can not parse location header field. this should never fail"),
value: ascii::AsciiString::from_ascii("/metrics")
.expect("can not parse header value. this should never fail"),
});
request.respond(response).map_err(HandlerError::Response)?;
Ok(())
}
}
use crossbeam_channel::{
bounded,
Receiver as OldReceiver,
Sender,
};
use hyper::{
rt::{
self,
Future,
},
service::service_fn_ok,
Body,
Error as HyperError,
Method,
Response as OldResponse,
Server as OldServer,
};
use std::{
error::Error as OldError,
fmt,
};
#[derive(Debug)]
pub enum StartError {
HyperError(HyperError),
}
impl From<HyperError> for StartError {
fn from(err: HyperError) -> Self {
StartError::HyperError(err)
}
}
impl fmt::Display for StartError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl OldError for StartError {
fn cause(&self) -> Option<&dyn OldError> {
match self {
StartError::HyperError(err) => Some(err),
}
}
}
#[derive(Debug)]
#[deprecated(since = "0.6.2", note = "Use `prometheus_exporter::Exporter` instead")]
pub struct PrometheusExporter;
#[derive(Debug)]
#[deprecated(
since = "0.6.2",
note = "Not needed anymore when using prometheus_exporter::Exporter"
)]
pub struct Update;
#[derive(Debug)]
#[deprecated(
since = "0.6.2",
note = "Not needed anymore when using prometheus_exporter::Exporter"
)]
pub struct FinishedUpdate;
#[allow(deprecated)]
impl PrometheusExporter {
#[deprecated(
since = "0.6.2",
note = "Use `prometheus_exporter::Exporter::start` instead"
)]
pub fn run(addr: &SocketAddr) -> Result<(), StartError> {
let service = move || {
let encoder = TextEncoder::new();
service_fn_ok(move |req| match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => PrometheusExporter::send_metrics(&encoder),
_ => PrometheusExporter::send_redirect(),
})
};
let server = OldServer::try_bind(&addr)?
.serve(service)
.map_err(|e| log_serving_error(&e));
log_startup(&addr);
rt::run(server);
Ok(())
}
#[must_use]
#[deprecated(
since = "0.6.2",
note = "Use `prometheus_exporter::Exporter::wait_request` instead"
)]
pub fn run_and_notify(addr: SocketAddr) -> (OldReceiver<Update>, Sender<FinishedUpdate>) {
let (update_sender, update_receiver) = bounded(0);
let (finished_sender, finished_receiver) = bounded(0);
thread::spawn(move || {
let service = move || {
let encoder = TextEncoder::new();
let update_sender = update_sender.clone();
let finished_receiver = finished_receiver.clone();
service_fn_ok(move |req| match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
update_sender
.send(Update {})
.expect("can not send update. this should never happen");
finished_receiver
.recv()
.expect("can not receive finish. this should never happen");
PrometheusExporter::send_metrics(&encoder)
}
_ => PrometheusExporter::send_redirect(),
})
};
let server = OldServer::bind(&addr)
.serve(service)
.map_err(|e| log_serving_error(&e));
log_startup(&addr);
rt::run(server);
});
(update_receiver, finished_sender)
}
#[must_use]
#[deprecated(
since = "0.6.2",
note = "Use `prometheus_exporter::Exporter::wait_duration` instead"
)]
pub fn run_and_repeat(
addr: SocketAddr,
duration: std::time::Duration,
) -> (OldReceiver<Update>, Sender<FinishedUpdate>) {
let (update_sender, update_receiver) = bounded(0);
let (finished_sender, finished_receiver) = bounded(0);
thread::spawn(move || {
let service = move || {
let encoder = TextEncoder::new();
service_fn_ok(move |req| match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => PrometheusExporter::send_metrics(&encoder),
_ => PrometheusExporter::send_redirect(),
})
};
let server = OldServer::bind(&addr)
.serve(service)
.map_err(|e| log_serving_error(&e));
log_startup(&addr);
rt::run(server);
});
{
thread::spawn(move || loop {
thread::sleep(duration);
update_sender
.send(Update {})
.expect("can not send update. this should never happen");
finished_receiver
.recv()
.expect("can not receive finish. this should never happen");
});
}
(update_receiver, finished_sender)
}
fn send_metrics(encoder: &TextEncoder) -> OldResponse<Body> {
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder
.encode(&metric_families, &mut buffer)
.expect("can not encode metrics");
OldResponse::new(Body::from(buffer))
}
fn send_redirect() -> OldResponse<Body> {
let message = "try /metrics for metrics\n";
OldResponse::builder()
.status(301)
.body(Body::from(message))
.expect("can not build response")
}
}
#[allow(unused)]
fn log_startup(addr: &SocketAddr) {
#[cfg(feature = "log")]
info!("Listening on http://{}", addr);
}
#[allow(unused)]
fn log_serving_error(error: &HyperError) {
#[cfg(feature = "log")]
error!("problem while serving metrics: {}", error)
}