use std::future::{Future, Ready, ready};
use actix_web::{
ResponseError,
body::MessageBody,
dev::{Service, ServiceRequest, ServiceResponse, Transform},
http::{Method, StatusCode, Version},
};
use opentelemetry::propagation::Extractor;
use tracing::Span;
use uuid::Uuid;
fn http_method_str(method: &Method) -> std::borrow::Cow<'static, str> {
match method {
&Method::OPTIONS => "OPTIONS".into(),
&Method::GET => "GET".into(),
&Method::POST => "POST".into(),
&Method::PUT => "PUT".into(),
&Method::DELETE => "DELETE".into(),
&Method::HEAD => "HEAD".into(),
&Method::TRACE => "TRACE".into(),
&Method::CONNECT => "CONNECT".into(),
&Method::PATCH => "PATCH".into(),
other => other.to_string().into(),
}
}
fn http_flavor(version: Version) -> std::borrow::Cow<'static, str> {
match version {
Version::HTTP_09 => "0.9".into(),
Version::HTTP_10 => "1.0".into(),
Version::HTTP_11 => "1.1".into(),
Version::HTTP_2 => "2.0".into(),
Version::HTTP_3 => "3.0".into(),
other => format!("{other:?}").into(),
}
}
fn http_scheme(scheme: &str) -> std::borrow::Cow<'static, str> {
match scheme {
"http" => "http".into(),
"https" => "https".into(),
other => other.to_string().into(),
}
}
pub(crate) struct RequestHeaderCarrier<'a> {
headers: &'a actix_web::http::header::HeaderMap,
}
impl<'a> RequestHeaderCarrier<'a> {
pub(crate) fn new(headers: &'a actix_web::http::header::HeaderMap) -> Self {
RequestHeaderCarrier { headers }
}
}
impl Extractor for RequestHeaderCarrier<'_> {
fn get(&self, key: &str) -> Option<&str> {
self.headers.get(key).and_then(|v| v.to_str().ok())
}
fn keys(&self) -> Vec<&str> {
self.headers.keys().map(|header| header.as_str()).collect()
}
}
fn set_otel_parent(req: &ServiceRequest, span: &Span) {
use opentelemetry::trace::TraceContextExt as _;
use tracing_opentelemetry::OpenTelemetrySpanExt as _;
let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&RequestHeaderCarrier::new(req.headers()))
});
let _ = span.set_parent(parent_context);
let trace_id = {
let id = span.context().span().span_context().trace_id();
format!("{id:032x}")
};
span.record("trace_id", tracing::field::display(trace_id));
}
fn root_span(request: &ServiceRequest) -> Span {
let user_agent = request
.headers()
.get("User-Agent")
.map(|h| h.to_str().unwrap_or(""))
.unwrap_or("");
let http_route: std::borrow::Cow<'static, str> = request
.match_pattern()
.map(Into::into)
.unwrap_or_else(|| "default".into());
let http_method = http_method_str(request.method());
let connection_info = request.connection_info();
let request_id = Uuid::now_v7();
let span = tracing::span!(
tracing::Level::INFO,
"HTTP request",
http.method = %http_method,
http.route = %http_route,
http.flavor = %http_flavor(request.version()),
http.scheme = %http_scheme(connection_info.scheme()),
http.host = %connection_info.host(),
http.client_ip = %request.connection_info().realip_remote_addr().unwrap_or(""),
http.user_agent = %user_agent,
http.target = %request.uri().path_and_query().map(|p| p.as_str()).unwrap_or(""),
http.status_code = tracing::field::Empty,
otel.name = %format!("{} {}", http_method, http_route),
otel.kind = "server",
otel.status_code = tracing::field::Empty,
trace_id = tracing::field::Empty,
request_id = %request_id,
exception.message = tracing::field::Empty,
exception.details = tracing::field::Empty,
);
std::mem::drop(connection_info);
set_otel_parent(request, &span);
span
}
pub(crate) struct Log {
info: bool,
}
pub(crate) struct LogMiddleware<S> {
info: bool,
inner: S,
}
impl Log {
pub(crate) fn new(info: bool) -> Self {
Self { info }
}
}
#[derive(Debug)]
pub(crate) struct LogError {
info: bool,
span: Span,
error: actix_web::Error,
}
pin_project_lite::pin_project! {
pub(crate) struct LogFuture<F> {
info: bool,
span: Span,
#[pin]
inner: F,
}
}
struct LogMeta {
info: bool,
status: StatusCode,
span: Span,
}
pin_project_lite::pin_project! {
pub(crate) struct LogBody<B> {
meta: Option<LogMeta>,
#[pin]
inner: B,
}
}
impl<S, B> Transform<S, ServiceRequest> for Log
where
B: MessageBody,
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
S::Future: 'static,
S::Error: Into<actix_web::Error>,
{
type Response = ServiceResponse<LogBody<B>>;
type Error = actix_web::Error;
type InitError = ();
type Transform = LogMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(LogMiddleware {
info: self.info,
inner: service,
}))
}
}
impl<S, B> Service<ServiceRequest> for LogMiddleware<S>
where
B: MessageBody,
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
S::Future: 'static,
S::Error: Into<actix_web::Error>,
{
type Response = ServiceResponse<LogBody<B>>;
type Error = actix_web::Error;
type Future = LogFuture<S::Future>;
fn poll_ready(
&self,
ctx: &mut core::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(ctx).map(|res| {
res.map_err(|e| {
LogError {
info: self.info,
error: e.into(),
span: Span::none(),
}
.into()
})
})
}
fn call(&self, req: ServiceRequest) -> Self::Future {
let span = root_span(&req);
LogFuture {
info: self.info,
inner: self.inner.call(req),
span,
}
}
}
impl<F, B, E> Future for LogFuture<F>
where
B: MessageBody,
F: Future<Output = Result<ServiceResponse<B>, E>>,
E: Into<actix_web::Error>,
{
type Output = Result<ServiceResponse<LogBody<B>>, actix_web::Error>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let info = self.info;
let span = self.span.clone();
let this = self.project();
span.in_scope(|| {
let span = Span::current();
std::task::Poll::Ready(match std::task::ready!(this.inner.poll(cx)) {
Ok(response) => {
let status = response.status();
if let Some(error) = response.response().error() {
handle_error(&span, status, error.as_response_error());
} else {
let code: i32 = response.response().status().as_u16().into();
span.record("http.status_code", code);
span.record("otel.status_code", "OK");
}
let log_meta = LogMeta { info, status, span };
let meta = if response.response().body().size().is_eof() {
log_meta.emit();
None
} else {
Some(log_meta)
};
Ok(response.map_body(|_, inner| LogBody { meta, inner }))
}
Err(e) => {
let error = e.into();
handle_error(
&span,
error.as_response_error().status_code(),
error.as_response_error(),
);
Err(LogError { info, error, span }.into())
}
})
})
}
}
impl<B> MessageBody for LogBody<B>
where
B: MessageBody,
{
type Error = B::Error;
fn size(&self) -> actix_web::body::BodySize {
self.inner.size()
}
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<actix_web::web::Bytes, Self::Error>>> {
let this = self.project();
let span = if let Some(meta) = &this.meta {
meta.span.clone()
} else {
Span::none()
};
let opt = std::task::ready!(span.in_scope(|| this.inner.poll_next(cx)));
if opt.is_none()
&& let Some(meta) = this.meta.take()
{
meta.emit();
}
std::task::Poll::Ready(opt)
}
}
impl std::fmt::Display for LogError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("Log error wrapper")
}
}
impl std::error::Error for LogError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.error.source()
}
}
impl ResponseError for LogError {
fn status_code(&self) -> actix_web::http::StatusCode {
self.error.as_response_error().status_code()
}
fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
let response = self.error.error_response();
let status = response.status();
let meta = LogMeta {
info: self.info,
status,
span: self.span.clone(),
};
if response.body().size().is_eof() {
meta.emit();
response
} else {
response.map_body(|_, inner| {
LogBody {
meta: Some(meta),
inner,
}
.boxed()
})
}
}
}
impl LogMeta {
fn emit(self) {
let span = self.span;
span.in_scope(|| {
if self.status.is_server_error() {
tracing::error!("server error");
} else if self.status.is_client_error() {
tracing::warn!("client error");
} else if self.status == actix_web::http::StatusCode::NOT_MODIFIED && self.info {
tracing::info!("completed");
} else if self.status == actix_web::http::StatusCode::NOT_MODIFIED {
tracing::debug!("completed");
} else if self.status.is_redirection() && self.info {
tracing::info!("redirected");
} else if self.status.is_redirection() {
tracing::debug!("redirected");
} else if self.info {
tracing::info!("completed");
} else {
tracing::debug!("completed");
}
})
}
}
fn handle_error(span: &Span, status_code: StatusCode, response_error: &dyn ResponseError) {
let display = format!("{response_error}");
let debug = format!("{response_error:?}");
span.record("exception.message", tracing::field::display(display));
span.record("exception.details", tracing::field::display(debug));
let code: i32 = status_code.as_u16().into();
span.record("http.status_code", code);
if status_code.is_client_error() {
span.record("otel.status_code", "OK");
} else {
span.record("otel.status_code", "ERROR");
}
}