#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
use super::memory_profiling;
#[cfg(feature = "memory-profiling")]
use super::pprof_symbol;
use crate::BootstrapResult;
#[cfg(feature = "metrics")]
use crate::telemetry::metrics;
use crate::telemetry::reexports::http_body_util::{BodyExt, Empty, Full, combinators::BoxBody};
use crate::telemetry::settings::TelemetrySettings;
#[cfg(feature = "tracing")]
use crate::telemetry::tracing;
use futures_util::future::{BoxFuture, FutureExt};
use hyper::body::{Bytes, Incoming};
use hyper::service::Service;
use hyper::{Method, Request, Response, StatusCode, header};
use percent_encoding::percent_decode_str;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
pub type TelemetryRouteBody = BoxBody<Bytes, crate::Error>;
pub type TelemetryRouteHandlerFuture =
BoxFuture<'static, Result<Response<TelemetryRouteBody>, Infallible>>;
pub type TelemetryRouteHandler = Box<
dyn Fn(Request<Incoming>, Arc<TelemetrySettings>) -> TelemetryRouteHandlerFuture
+ Send
+ Sync
+ 'static,
>;
type RouteHandlerShared = Arc<
dyn Fn(Request<Incoming>, Arc<TelemetrySettings>) -> TelemetryRouteHandlerFuture
+ Send
+ Sync
+ 'static,
>;
pub struct TelemetryServerRoute {
pub path: String,
pub methods: Vec<Method>,
pub handler: TelemetryRouteHandler,
}
struct Routes(HashMap<Method, matchit::Router<RouteHandlerShared>>);
impl Routes {
fn new(custom_routes: Vec<TelemetryServerRoute>) -> BootstrapResult<Self> {
let mut map = Self(Default::default());
map.init_built_in_routes()?;
for route in custom_routes {
map.set(route)?;
}
Ok(map)
}
fn init_built_in_routes(&mut self) -> BootstrapResult<()> {
self.set(TelemetryServerRoute {
path: "/health".into(),
methods: vec![Method::GET],
handler: Box::new(|_, _| async { into_response("text/plain", Ok("")) }.boxed()),
})?;
#[cfg(feature = "metrics")]
self.set(TelemetryServerRoute {
path: "/metrics".into(),
methods: vec![Method::GET],
handler: Box::new(|_, settings| {
async move {
into_response(
"application/openmetrics-text; version=1.0.0; charset=utf-8",
metrics::collect(&settings.metrics),
)
}
.boxed()
}),
})?;
#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
self.set(TelemetryServerRoute {
path: "/pprof/heap".into(),
methods: vec![Method::GET],
handler: Box::new(|_, settings| {
async move {
into_response(
"application/x-gperftools-profile",
memory_profiling::heap_profile(settings).await,
)
}
.boxed()
}),
})?;
#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
self.set(TelemetryServerRoute {
path: "/pprof/heap_stats".into(),
methods: vec![Method::GET],
handler: Box::new(|_, settings| {
async move {
into_response(
"text/plain; charset=utf-8",
memory_profiling::heap_stats(settings).await,
)
}
.boxed()
}),
})?;
#[cfg(feature = "memory-profiling")]
self.set(TelemetryServerRoute {
path: "/pprof/symbol".into(),
methods: vec![Method::GET, Method::POST],
handler: Box::new(|req, _| {
async move {
into_response(
"text/plain; charset=utf-8",
pprof_symbol::pprof_symbol(req).await,
)
}
.boxed()
}),
})?;
#[cfg(feature = "tracing")]
self.set(TelemetryServerRoute {
path: "/debug/traces".into(),
methods: vec![Method::GET],
handler: Box::new(|_, _settings| {
async move {
into_response(
"application/json; charset=utf-8",
Ok(tracing::get_active_traces()),
)
}
.boxed()
}),
})?;
Ok(())
}
#[allow(unused_mut, reason = "conditional mutation")]
fn set(&mut self, mut route: TelemetryServerRoute) -> BootstrapResult<()> {
let handler = Arc::from(route.handler);
#[cfg(not(feature = "telemetry-server-pattern-routing"))]
{
route.path = route.path.replace('{', "{{").replace('}', "}}");
}
for method in route.methods {
let res = self
.0
.entry(method)
.or_default()
.insert(route.path.clone(), Arc::clone(&handler));
match res {
Ok(()) => {}
Err(matchit::InsertError::Conflict { with }) if with == route.path => {}
Err(e) => anyhow::bail!("tried to insert route `{}`, but {}", &route.path, e),
}
}
Ok(())
}
}
#[derive(Clone)]
pub(super) struct Router {
routes: Arc<Routes>,
settings: Arc<TelemetrySettings>,
}
impl Router {
pub(super) fn new(
custom_routes: Vec<TelemetryServerRoute>,
settings: Arc<TelemetrySettings>,
) -> BootstrapResult<Self> {
Ok(Self {
routes: Arc::new(Routes::new(custom_routes)?),
settings,
})
}
async fn handle_request(&self, req: Request<Incoming>) -> Response<TelemetryRouteBody> {
let res = Response::builder();
let Ok(path) = percent_decode_str(req.uri().path()).decode_utf8() else {
return res
.status(StatusCode::BAD_REQUEST)
.body(BoxBody::new(
Full::from("can't percent-decode URI path as valid UTF-8").map_err(Into::into),
))
.unwrap();
};
let Some(handler) = self
.routes
.0
.get(req.method())
.and_then(|r| Some(r.at(&path).ok()?.value))
else {
return res
.status(StatusCode::NOT_FOUND)
.body(BoxBody::new(Empty::new().map_err(Into::into)))
.unwrap();
};
match (handler)(req, Arc::clone(&self.settings)).await {
Ok(res) => res,
Err(e) => match e {},
}
}
}
impl Service<Request<Incoming>> for Router {
type Response = Response<TelemetryRouteBody>;
type Error = Infallible;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, req: Request<Incoming>) -> Self::Future {
let router = self.clone();
async move { Ok(router.handle_request(req).await) }.boxed()
}
}
fn into_response(
content_type: &str,
res: crate::Result<impl Into<Full<Bytes>>>,
) -> Result<Response<TelemetryRouteBody>, Infallible> {
Ok(match res {
Ok(data) => Response::builder()
.header(header::CONTENT_TYPE, content_type)
.body(BoxBody::new(data.into().map_err(Into::into)))
.unwrap(),
Err(err) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(BoxBody::new(
Full::from(err.to_string()).map_err(Into::into),
))
.unwrap(),
})
}