use std::sync::Arc;
use std::time::{Duration, Instant};
use log::{error, warn};
use warp::http::HeaderMap;
use warp::{self, Filter, Reply};
use crate::i2pcontrol::rpc::RpcCallError;
use crate::i2pcontrol::I2pControlClient;
use crate::metrics::encode_metrics_text;
use crate::version;
fn effective_timeout(headers: &HeaderMap, hard_max: Duration) -> Option<Duration> {
const MARGIN: f64 = 0.5;
const MARGIN_THRESHOLD: f64 = 3.0;
let secs = headers
.get("X-Prometheus-Scrape-Timeout-Seconds")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<f64>().ok())
.filter(|v| v.is_finite())?;
let adjusted = if secs > MARGIN_THRESHOLD {
secs - MARGIN
} else {
secs
};
let adjusted = adjusted.max(0.1);
let capped = adjusted.min(hard_max.as_secs_f64());
Some(Duration::from_secs_f64(capped))
}
const OM_CONTENT_TYPE: &str = "application/openmetrics-text; version=1.0.0; charset=utf-8";
fn choose_content_type(headers: &HeaderMap) -> &'static str {
match headers.get("Accept").and_then(|v| v.to_str().ok()) {
Some(accept) => {
let a = accept.to_ascii_lowercase();
if a.contains("application/openmetrics-text") || a.contains("*/*") {
OM_CONTENT_TYPE
} else {
OM_CONTENT_TYPE
}
}
None => OM_CONTENT_TYPE,
}
}
pub async fn metrics_handler(
st: Arc<I2pControlClient>,
headers: HeaderMap,
) -> Result<impl warp::Reply, warp::Rejection> {
let t0 = Instant::now();
let Some(effective_timeout) = effective_timeout(&headers, st.max_scrape_timeout) else {
let msg = "missing or invalid X-Prometheus-Scrape-Timeout-Seconds header".to_string();
let reply = warp::reply::with_status(msg, warp::http::StatusCode::BAD_REQUEST);
let reply = warp::reply::with_header(reply, "Content-Type", choose_content_type(&headers));
let reply = warp::reply::with_header(reply, "Cache-Control", "no-store");
return Ok(reply);
};
let (status_code, router_data, scrape_error) = match tokio::time::timeout(
effective_timeout,
st.fetch_router_info(effective_timeout),
)
.await
{
Err(_elapsed) => {
warn!(
"Scrape timed out; effective budget {:.3}s",
effective_timeout.as_secs_f64()
);
(warp::http::StatusCode::GATEWAY_TIMEOUT, None, 1u8)
}
Ok(Ok(data)) => (warp::http::StatusCode::OK, Some(data), 0u8),
Ok(Err(err)) => {
error!("Failed to fetch metrics: {}", err);
let status = if let Some(rpc) = err.downcast_ref::<RpcCallError>() {
match rpc {
RpcCallError::Transport(e) if e.is_timeout() => {
warp::http::StatusCode::GATEWAY_TIMEOUT
}
_ => warp::http::StatusCode::INTERNAL_SERVER_ERROR,
}
} else if let Some(ioe) = err.downcast_ref::<std::io::Error>() {
if ioe.kind() == std::io::ErrorKind::TimedOut {
warp::http::StatusCode::GATEWAY_TIMEOUT
} else {
warp::http::StatusCode::INTERNAL_SERVER_ERROR
}
} else {
warp::http::StatusCode::INTERNAL_SERVER_ERROR
};
(status, None, 1u8)
}
};
let scrape_seconds = t0.elapsed().as_secs_f64();
let body = encode_metrics_text(
router_data.as_ref(),
scrape_seconds,
Some(effective_timeout.as_secs_f64()),
scrape_error,
version::VERSION,
);
let reply = warp::reply::with_status(body, status_code);
let reply = warp::reply::with_header(reply, "Content-Type", choose_content_type(&headers));
let reply = warp::reply::with_header(reply, "Cache-Control", "no-store");
Ok(reply)
}
pub async fn metrics_handler_response(
st: Arc<I2pControlClient>,
headers: HeaderMap,
) -> Result<warp::reply::Response, warp::Rejection> {
let r = metrics_handler(st, headers).await?;
Ok(r.into_response())
}
pub fn routes(
state: Arc<I2pControlClient>,
) -> impl Filter<Extract = (warp::reply::Response,), Error = warp::Rejection> + Clone {
let route_metrics = warp::path("metrics")
.and(warp::path::end())
.and(warp::get())
.and(warp::any().map(move || state.clone()))
.and(warp::header::headers_cloned())
.and_then(metrics_handler_response);
let route_404 = warp::path::end().map(|| {
warp::reply::with_status("Not Found", warp::http::StatusCode::NOT_FOUND).into_response()
});
route_metrics.or(route_404).unify()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn timeout_no_header_is_none() {
let headers = HeaderMap::new();
assert!(effective_timeout(&headers, Duration::from_secs(60)).is_none());
}
#[test]
fn timeout_smaller_header_wins_with_margin_above_threshold() {
let mut headers = HeaderMap::new();
headers.insert(
"X-Prometheus-Scrape-Timeout-Seconds",
"3.1".parse().unwrap(),
);
let eff = effective_timeout(&headers, Duration::from_secs(60)).unwrap();
assert!((eff.as_secs_f64() - 2.6).abs() < 1e-9);
}
#[test]
fn timeout_large_header_is_capped_by_max() {
let mut headers = HeaderMap::new();
headers.insert(
"X-Prometheus-Scrape-Timeout-Seconds",
"30.0".parse().unwrap(),
);
let eff = effective_timeout(&headers, Duration::from_secs(10)).unwrap();
assert!((eff.as_secs_f64() - 10.0).abs() < 1e-9);
}
#[test]
fn timeout_small_header_kept_no_margin() {
let mut headers = HeaderMap::new();
headers.insert(
"X-Prometheus-Scrape-Timeout-Seconds",
"0.2".parse().unwrap(),
);
let eff = effective_timeout(&headers, Duration::from_secs(60)).unwrap();
assert!((eff.as_secs_f64() - 0.2).abs() < 1e-9);
}
#[test]
fn timeout_header_negative_value_clamped_to_min() {
let mut headers = HeaderMap::new();
headers.insert("X-Prometheus-Scrape-Timeout-Seconds", "-5".parse().unwrap());
let eff = effective_timeout(&headers, Duration::from_secs(60)).unwrap();
assert!((eff.as_secs_f64() - 0.1).abs() < 1e-9);
}
#[test]
fn timeout_header_non_numeric_is_none() {
let mut headers = HeaderMap::new();
headers.insert(
"X-Prometheus-Scrape-Timeout-Seconds",
"not-a-number".parse().unwrap(),
);
assert!(effective_timeout(&headers, Duration::from_secs(60)).is_none());
}
}