use crate::protocol::problem::ProblemTelemetry;
use axum::{
body::Body,
extract::MatchedPath,
http::{Request, Version},
middleware::Next,
response::Response,
};
use std::time::Instant;
use tracing::{Instrument, Span, field, info_span};
pub async fn track_requests(request: Request<Body>, next: Next) -> Response {
let method = request.method().clone();
let path = request.uri().path().to_string();
let query = request.uri().query().map(ToOwned::to_owned);
let matched_path = request
.extensions()
.get::<MatchedPath>()
.map(MatchedPath::as_str)
.map(ToOwned::to_owned);
let route = matched_path.as_deref();
let live_mode = query
.as_deref()
.and_then(|query| query_param(query, "live"))
.map(ToOwned::to_owned);
let stream_id = route
.and_then(|route| resource_id_from_path(route, &path))
.filter(|stream_id| !stream_id.is_empty());
let host = forwarded_or_host(&request);
let client_address = forwarded_for(&request);
let user_agent = request
.headers()
.get("user-agent")
.and_then(|value| value.to_str().ok())
.map(ToOwned::to_owned);
let operation = operation_name(method.as_str(), route, live_mode.as_deref());
let http_version = http_version(request.version());
let span = request_span(
operation,
method.as_str(),
&path,
query.as_deref(),
route,
live_mode.as_deref(),
stream_id.as_deref(),
host.as_deref(),
client_address.as_deref(),
user_agent.as_deref(),
http_version,
);
let started = Instant::now();
let response = next.run(request).instrument(span.clone()).await;
emit_response_event(&span, &response, started.elapsed());
response
}
fn request_span(
operation: &'static str,
method: &str,
path: &str,
query: Option<&str>,
route: Option<&str>,
live_mode: Option<&str>,
stream_id: Option<&str>,
server_address: Option<&str>,
client_address: Option<&str>,
user_agent: Option<&str>,
http_version: &'static str,
) -> Span {
let span = info_span!(
"durable_streams.server",
"ds.operation" = operation,
"ds.live_mode" = field::Empty,
"ds.stream_id" = field::Empty,
"ds.error_code" = field::Empty,
"ds.error_class" = field::Empty,
"ds.storage.backend" = field::Empty,
"ds.storage.operation" = field::Empty,
"http.request.method" = method,
"http.route" = field::Empty,
"http.response.status_code" = field::Empty,
"http.response.header.retry_after" = field::Empty,
"url.path" = path,
"url.query" = field::Empty,
"server.address" = field::Empty,
"client.address" = field::Empty,
"user_agent.original" = field::Empty,
"network.protocol.version" = http_version,
"event.duration" = field::Empty,
"error.type" = field::Empty,
"error.message" = field::Empty
);
if let Some(query) = query {
span.record("url.query", query);
}
if let Some(route) = route {
span.record("http.route", route);
}
if let Some(live_mode) = live_mode {
span.record("ds.live_mode", live_mode);
}
if let Some(stream_id) = stream_id {
span.record("ds.stream_id", stream_id);
}
if let Some(server_address) = server_address {
span.record("server.address", server_address);
}
if let Some(client_address) = client_address {
span.record("client.address", client_address);
}
if let Some(user_agent) = user_agent {
span.record("user_agent.original", user_agent);
}
span
}
fn emit_response_event(span: &Span, response: &Response, elapsed: std::time::Duration) {
span.record(
"http.response.status_code",
field::display(response.status().as_u16()),
);
span.record(
"event.duration",
u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX),
);
if let Some(problem) = response.extensions().get::<ProblemTelemetry>() {
span.record("ds.error_code", problem.code.as_str());
if let Some(error_class) = &problem.error_class {
span.record("ds.error_class", error_class.as_str());
}
if let Some(storage_backend) = &problem.storage_backend {
span.record("ds.storage.backend", storage_backend.as_str());
}
if let Some(storage_operation) = &problem.storage_operation {
span.record("ds.storage.operation", storage_operation.as_str());
}
if let Some(retry_after_secs) = problem.retry_after_secs {
span.record("http.response.header.retry_after", retry_after_secs);
}
span.record("error.type", problem.problem_type.as_str());
span.record(
"error.message",
problem
.internal_detail
.as_deref()
.or(problem.detail.as_deref())
.unwrap_or(problem.title.as_str()),
);
} else if response.status().is_server_error() {
span.record("error.type", "server_error");
span.record("error.message", "request failed");
} else if response.status().is_client_error() {
span.record("error.type", "client_error");
}
if response.status().is_server_error() {
tracing::error!("request failed");
} else if response.status().is_client_error() {
tracing::warn!("request completed with client error");
} else {
tracing::info!("request completed");
}
}
fn operation_name(method: &str, route: Option<&str>, live_mode: Option<&str>) -> &'static str {
if matches!(route, Some("/healthz")) {
return "health";
}
if matches!(route, Some("/readyz")) {
return "readiness";
}
if !route.is_some_and(has_resource_segment) {
return "request";
}
match (method, live_mode) {
("PUT", _) => "create",
("POST", _) => "append",
("HEAD", _) => "head",
("DELETE", _) => "delete",
("GET", Some("sse")) => "subscribe",
("GET", Some("long-poll")) => "poll",
("GET", _) => "read",
_ => "request",
}
}
fn has_resource_segment(route: &str) -> bool {
route
.split('/')
.filter(|segment| !segment.is_empty())
.any(is_route_param_segment)
}
fn resource_id_from_path<'a>(route: &str, path: &'a str) -> Option<&'a str> {
let parameter_index = route
.split('/')
.filter(|segment| !segment.is_empty())
.enumerate()
.filter_map(|(index, segment)| is_route_param_segment(segment).then_some(index))
.last()?;
path.split('/')
.filter(|segment| !segment.is_empty())
.nth(parameter_index)
}
fn is_route_param_segment(segment: &str) -> bool {
segment.starts_with('{') && segment.ends_with('}') && segment.len() > 2
}
fn query_param<'a>(query: &'a str, name: &str) -> Option<&'a str> {
query.split('&').find_map(|pair| {
let (key, value) = pair.split_once('=').unwrap_or((pair, ""));
(key == name).then_some(value)
})
}
fn forwarded_or_host(request: &Request<Body>) -> Option<String> {
request
.headers()
.get("x-forwarded-host")
.and_then(|value| value.to_str().ok())
.map(ToOwned::to_owned)
.or_else(|| {
request
.headers()
.get("host")
.and_then(|value| value.to_str().ok())
.map(ToOwned::to_owned)
})
}
fn forwarded_for(request: &Request<Body>) -> Option<String> {
request
.headers()
.get("x-forwarded-for")
.and_then(|value| value.to_str().ok())
.and_then(|value| value.split(',').next())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
}
fn http_version(version: Version) -> &'static str {
match version {
Version::HTTP_09 => "0.9",
Version::HTTP_10 => "1.0",
Version::HTTP_11 => "1.1",
Version::HTTP_2 => "2",
Version::HTTP_3 => "3",
_ => "unknown",
}
}
#[cfg(test)]
mod tests {
use super::{operation_name, query_param, resource_id_from_path};
#[test]
fn operation_name_distinguishes_live_read_modes() {
assert_eq!(
operation_name("GET", Some("/streams/{name}"), Some("long-poll")),
"poll"
);
assert_eq!(
operation_name("GET", Some("/streams/{name}"), Some("sse")),
"subscribe"
);
assert_eq!(operation_name("GET", Some("/streams/{name}"), None), "read");
assert_eq!(
operation_name("GET", Some("/documents/{uuid}"), None),
"read"
);
}
#[test]
fn operation_name_handles_nested_subresources() {
assert_eq!(
operation_name("GET", Some("/documents/{uuid}/slides/{slide_id}"), None),
"read"
);
assert_eq!(
operation_name("POST", Some("/documents/{uuid}/ack"), None),
"append"
);
}
#[test]
fn resource_id_from_path_uses_last_parameter_segment() {
assert_eq!(
resource_id_from_path("/documents/{uuid}", "/documents/doc-1"),
Some("doc-1")
);
assert_eq!(
resource_id_from_path(
"/documents/{doc_id}/slides/{slide_id}",
"/documents/doc-1/slides/slide-2"
),
Some("slide-2")
);
assert_eq!(
resource_id_from_path("/documents/{uuid}/ack", "/documents/doc-1/ack"),
Some("doc-1")
);
}
#[test]
fn query_param_extracts_named_values() {
assert_eq!(query_param("offset=now&live=sse", "live"), Some("sse"));
assert_eq!(query_param("offset=now", "live"), None);
}
}