use std::sync::Arc;
use axum::response::Response;
use futures::future::BoxFuture;
use crate::web::context::RequestContext;
pub struct NextHandler {
inner: Arc<dyn Fn(RequestContext) -> BoxFuture<'static, Response> + Send + Sync>,
}
impl NextHandler {
#[doc(hidden)]
pub fn new<F>(f: F) -> Self
where
F: Fn(RequestContext) -> BoxFuture<'static, Response> + Send + Sync + 'static,
{
Self { inner: Arc::new(f) }
}
#[inline]
pub fn run(&self, ctx: RequestContext) -> BoxFuture<'static, Response> {
(self.inner)(ctx)
}
#[doc(hidden)]
#[inline]
pub fn __clone_for_chain(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
pub trait Interceptor: Send + Sync + 'static {
fn around(
&'static self,
ctx: RequestContext,
next: NextHandler,
) -> BoxFuture<'static, Response>;
}
pub struct LatencyLog;
impl Interceptor for LatencyLog {
fn around(
&'static self,
ctx: RequestContext,
next: NextHandler,
) -> BoxFuture<'static, Response> {
Box::pin(async move {
let method = ctx.method().clone();
let path = ctx.path().to_owned();
let started = std::time::Instant::now();
let resp = next.run(ctx).await;
let micros = started.elapsed().as_micros();
println!(
"[arcly] {method} {path} -> {} in {micros}\u{00B5}s",
resp.status().as_u16()
);
resp
})
}
}
pub struct TelemetryLog;
impl Interceptor for TelemetryLog {
fn around(
&'static self,
ctx: RequestContext,
next: NextHandler,
) -> BoxFuture<'static, Response> {
Box::pin(async move {
use crate::web::context::{hex_encode_16, hex_encode_8};
let method = ctx.method().to_string();
let path = ctx.path().to_owned();
let trace_id = hex_encode_16(&ctx.trace_id());
let span_id = hex_encode_8(&ctx.span_id());
let started = std::time::Instant::now();
let resp = next.run(ctx).await;
let status = resp.status().as_u16();
let duration_ms = started.elapsed().as_millis() as u64;
if status >= 500 {
tracing::error!(
trace_id,
span_id,
method,
path,
status,
duration_ms,
"request failed"
);
} else if status >= 400 {
tracing::warn!(
trace_id,
span_id,
method,
path,
status,
duration_ms,
"request error"
);
} else {
tracing::info!(
trace_id,
span_id,
method,
path,
status,
duration_ms,
"request completed"
);
}
resp
})
}
}
pub struct TraceInterceptor;
struct SpanGuard<S: opentelemetry::trace::Span>(S);
impl<S: opentelemetry::trace::Span> Drop for SpanGuard<S> {
fn drop(&mut self) {
self.0.end();
}
}
impl Interceptor for TraceInterceptor {
fn around(
&'static self,
ctx: RequestContext,
next: NextHandler,
) -> BoxFuture<'static, Response> {
Box::pin(async move {
use crate::observability::metrics::record_request;
use crate::observability::otel::parent_context_from;
use crate::web::context::{hex_encode_16, hex_encode_8};
use opentelemetry::trace::{Span, SpanKind, Status, Tracer};
use opentelemetry::KeyValue;
let method = ctx.method().to_string();
let route = ctx.route().to_owned();
let trace_id = hex_encode_16(&ctx.trace_id());
let span_id = hex_encode_8(&ctx.span_id());
let started = std::time::Instant::now();
let parent_cx = parent_context_from(&ctx);
let tracer = opentelemetry::global::tracer("arcly-http");
let span_name = format!("{method} {route}");
let mut span = SpanGuard(
tracer
.span_builder(span_name)
.with_kind(SpanKind::Server)
.start_with_context(&tracer, &parent_cx),
);
span.0
.set_attribute(KeyValue::new("http.request.method", method.clone()));
span.0
.set_attribute(KeyValue::new("http.route", route.clone()));
let resp = next.run(ctx).await;
let status = resp.status().as_u16();
let elapsed = started.elapsed();
let duration_ms = elapsed.as_millis() as u64;
span.0
.set_attribute(KeyValue::new("http.response.status_code", status as i64));
if status >= 500 {
span.0.set_status(Status::error("server error"));
}
record_request(&route, &method, status, elapsed.as_secs_f64());
if status >= 500 {
tracing::error!(
trace_id,
span_id,
method,
route,
status,
duration_ms,
"request failed"
);
} else if status >= 400 {
tracing::warn!(
trace_id,
span_id,
method,
route,
status,
duration_ms,
"request error"
);
} else {
tracing::info!(
trace_id,
span_id,
method,
route,
status,
duration_ms,
"request completed"
);
}
resp
})
}
}
pub struct EnvelopeResponse;
impl Interceptor for EnvelopeResponse {
fn around(
&'static self,
ctx: RequestContext,
next: NextHandler,
) -> BoxFuture<'static, Response> {
Box::pin(async move {
use axum::body::to_bytes;
let resp = next.run(ctx).await;
if !resp.status().is_success() {
return resp;
}
let (parts, body) = resp.into_parts();
let bytes = to_bytes(body, usize::MAX).await.unwrap_or_default();
let inner: serde_json::Value =
serde_json::from_slice(&bytes).unwrap_or(serde_json::Value::Null);
let wrapped = serde_json::json!({ "data": inner });
let body = axum::body::Body::from(serde_json::to_vec(&wrapped).unwrap());
Response::from_parts(parts, body)
})
}
}