use std::net::ToSocketAddrs;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use http::uri::PathAndQuery;
use log::{debug, error};
use time::OffsetDateTime;
use tiny_http::{ConfigListenAddr, Method, Response, Server};
use crate::error::Error;
pub const DEFAULT_PATH: &str = "/metrics";
pub struct MetricsServer {
shared: Arc<SharedData>,
thread: Option<thread::JoinHandle<()>>,
}
struct SharedData {
data: Mutex<Vec<u8>>,
server: Server,
stop: AtomicBool,
}
impl MetricsServer {
pub fn new<A>(addr: A) -> Result<Self, Error>
where
A: ToSocketAddrs,
{
let listener = ConfigListenAddr::from_socket_addrs(addr)?;
Self::build(tiny_http::ServerConfig {
addr: listener,
ssl: None,
})
}
#[cfg(feature = "tls")]
pub fn new_tls<A>(addr: A, certificate: Vec<u8>, private_key: Vec<u8>) -> Result<Self, Error>
where
A: ToSocketAddrs,
{
let listener = ConfigListenAddr::from_socket_addrs(addr)?;
Self::build(tiny_http::ServerConfig {
addr: listener,
ssl: Some(tiny_http::SslConfig {
certificate,
private_key,
}),
})
}
fn build(config: tiny_http::ServerConfig) -> Result<Self, Error> {
let server = Server::new(config)?;
let shared = Arc::new(SharedData {
data: Mutex::new(Vec::new()),
server,
stop: AtomicBool::new(false),
});
Ok(MetricsServer {
shared,
thread: None,
})
}
pub fn http<A>(addr: A) -> Self
where
A: ToSocketAddrs,
{
let mut server = MetricsServer::new(addr).unwrap();
server.serve();
server
}
#[cfg(feature = "tls")]
pub fn https<A>(addr: A, certificate: Vec<u8>, private_key: Vec<u8>) -> Self
where
A: ToSocketAddrs,
{
let mut server = MetricsServer::new_tls(addr, certificate, private_key).unwrap();
server.serve();
server
}
pub fn update(&self, data: Vec<u8>) -> usize {
let mut buf = self.shared.data.lock().unwrap();
*buf = data;
buf.len()
}
pub fn serve(&mut self) {
self.serve_uri(DEFAULT_PATH.to_string())
}
pub fn serve_uri(&mut self, path: String) {
if let Some(thread) = &self.thread {
if !thread.is_finished() {
debug!("metrics server already running, continuing");
return;
}
}
let path = parse_path(&path);
let s = Arc::clone(&self.shared);
self.thread = Some(thread::spawn({
move || {
for req in s.server.incoming_requests() {
if s.stop.load(Ordering::Relaxed) {
debug!("metrics server stopping");
return;
}
if req.url() != path {
let res = Response::empty(404);
respond(req, res);
continue;
}
if req.method() != &Method::Get {
let res = Response::empty(405);
respond(req, res);
continue;
}
let metrics = s.data.lock().unwrap();
let res = Response::from_data(metrics.as_slice());
respond(req, res);
}
}
}));
}
pub fn stop(mut self) -> Result<(), Error> {
self.shared.stop.store(true, Ordering::Relaxed);
self.shared.server.unblock();
if let Some(thread) = self.thread.take() {
thread.join()?;
}
Ok(())
}
}
fn parse_path(uri: &str) -> String {
match PathAndQuery::from_str(uri) {
Ok(pq) => {
let mut path = pq.path().to_lowercase();
if !path.starts_with('/') {
path.insert(0, '/');
}
path
}
Err(_) => {
error!("invalid uri, defaulting to {DEFAULT_PATH}");
DEFAULT_PATH.to_string()
}
}
}
fn respond<D>(req: tiny_http::Request, res: tiny_http::Response<D>)
where
D: std::io::Read,
{
let datetime = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "-".to_string());
debug!(
"{} [{}] \"{} {} HTTP/{}\" {}",
req.remote_addr().map_or("-".to_string(), |v| v.to_string()),
datetime,
req.method(),
req.url(),
req.http_version(),
res.status_code().0,
);
if let Err(e) = req.respond(res) {
error!("error sending metrics response: {e}");
};
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_path() {
let expected_default = DEFAULT_PATH.to_string();
let expected_valid = "/debug/metrics".to_string();
assert_eq!(parse_path("Hello, World!"), expected_default);
assert_eq!(parse_path(" metr ics "), expected_default);
assert_eq!(parse_path("/debug/metrics"), expected_valid);
assert_eq!(parse_path("debug/metrics"), expected_valid);
assert_eq!(parse_path("DEBUG/METRICS"), expected_valid);
}
}