arcly_http/web/
interceptors.rs1use std::sync::Arc;
13
14use axum::response::Response;
15use futures::future::BoxFuture;
16
17use crate::web::context::RequestContext;
18
19pub struct NextHandler {
22 inner: Arc<dyn Fn(RequestContext) -> BoxFuture<'static, Response> + Send + Sync>,
23}
24
25impl NextHandler {
26 #[doc(hidden)]
27 pub fn new<F>(f: F) -> Self
28 where
29 F: Fn(RequestContext) -> BoxFuture<'static, Response> + Send + Sync + 'static,
30 {
31 Self { inner: Arc::new(f) }
32 }
33
34 #[inline]
35 pub fn run(&self, ctx: RequestContext) -> BoxFuture<'static, Response> {
36 (self.inner)(ctx)
37 }
38
39 #[doc(hidden)]
42 #[inline]
43 pub fn __clone_for_chain(&self) -> Self {
44 Self {
45 inner: Arc::clone(&self.inner),
46 }
47 }
48}
49
50pub trait Interceptor: Send + Sync + 'static {
56 fn around(
57 &'static self,
58 ctx: RequestContext,
59 next: NextHandler,
60 ) -> BoxFuture<'static, Response>;
61}
62
63pub struct LatencyLog;
67impl Interceptor for LatencyLog {
68 fn around(
69 &'static self,
70 ctx: RequestContext,
71 next: NextHandler,
72 ) -> BoxFuture<'static, Response> {
73 Box::pin(async move {
74 let method = ctx.method().clone();
75 let path = ctx.path().to_owned();
76 let started = std::time::Instant::now();
77 let resp = next.run(ctx).await;
78 let micros = started.elapsed().as_micros();
79 println!(
80 "[arcly] {method} {path} -> {} in {micros}\u{00B5}s",
81 resp.status().as_u16()
82 );
83 resp
84 })
85 }
86}
87
88pub struct TelemetryLog;
97
98impl Interceptor for TelemetryLog {
99 fn around(
100 &'static self,
101 ctx: RequestContext,
102 next: NextHandler,
103 ) -> BoxFuture<'static, Response> {
104 Box::pin(async move {
105 use crate::web::context::{hex_encode_16, hex_encode_8};
106 let method = ctx.method().to_string();
107 let path = ctx.path().to_owned();
108 let trace_id = hex_encode_16(&ctx.trace_id());
109 let span_id = hex_encode_8(&ctx.span_id());
110 let started = std::time::Instant::now();
111
112 let resp = next.run(ctx).await;
113
114 let status = resp.status().as_u16();
115 let duration_ms = started.elapsed().as_millis() as u64;
116
117 if status >= 500 {
118 tracing::error!(
119 trace_id,
120 span_id,
121 method,
122 path,
123 status,
124 duration_ms,
125 "request failed"
126 );
127 } else if status >= 400 {
128 tracing::warn!(
129 trace_id,
130 span_id,
131 method,
132 path,
133 status,
134 duration_ms,
135 "request error"
136 );
137 } else {
138 tracing::info!(
139 trace_id,
140 span_id,
141 method,
142 path,
143 status,
144 duration_ms,
145 "request completed"
146 );
147 }
148
149 resp
150 })
151 }
152}
153
154pub struct TraceInterceptor;
166
167struct SpanGuard<S: opentelemetry::trace::Span>(S);
170
171impl<S: opentelemetry::trace::Span> Drop for SpanGuard<S> {
172 fn drop(&mut self) {
173 self.0.end();
174 }
175}
176
177impl Interceptor for TraceInterceptor {
178 fn around(
179 &'static self,
180 ctx: RequestContext,
181 next: NextHandler,
182 ) -> BoxFuture<'static, Response> {
183 Box::pin(async move {
184 use crate::observability::metrics::record_request;
185 use crate::observability::otel::parent_context_from;
186 use crate::web::context::{hex_encode_16, hex_encode_8};
187 use opentelemetry::trace::{Span, SpanKind, Status, Tracer};
188 use opentelemetry::KeyValue;
189
190 let method = ctx.method().to_string();
191 let route = ctx.route().to_owned();
192 let trace_id = hex_encode_16(&ctx.trace_id());
193 let span_id = hex_encode_8(&ctx.span_id());
194 let started = std::time::Instant::now();
195
196 let parent_cx = parent_context_from(&ctx);
198 let tracer = opentelemetry::global::tracer("arcly-http");
199 let span_name = format!("{method} {route}");
200 let mut span = SpanGuard(
201 tracer
202 .span_builder(span_name)
203 .with_kind(SpanKind::Server)
204 .start_with_context(&tracer, &parent_cx),
205 );
206 span.0
207 .set_attribute(KeyValue::new("http.request.method", method.clone()));
208 span.0
209 .set_attribute(KeyValue::new("http.route", route.clone()));
210
211 let resp = next.run(ctx).await;
213
214 let status = resp.status().as_u16();
216 let elapsed = started.elapsed();
217 let duration_ms = elapsed.as_millis() as u64;
218
219 span.0
220 .set_attribute(KeyValue::new("http.response.status_code", status as i64));
221 if status >= 500 {
222 span.0.set_status(Status::error("server error"));
223 }
224
225 record_request(&route, &method, status, elapsed.as_secs_f64());
227
228 if status >= 500 {
230 tracing::error!(
231 trace_id,
232 span_id,
233 method,
234 route,
235 status,
236 duration_ms,
237 "request failed"
238 );
239 } else if status >= 400 {
240 tracing::warn!(
241 trace_id,
242 span_id,
243 method,
244 route,
245 status,
246 duration_ms,
247 "request error"
248 );
249 } else {
250 tracing::info!(
251 trace_id,
252 span_id,
253 method,
254 route,
255 status,
256 duration_ms,
257 "request completed"
258 );
259 }
260
261 resp
262 })
263 }
264}
265
266pub struct EnvelopeResponse;
269impl Interceptor for EnvelopeResponse {
270 fn around(
271 &'static self,
272 ctx: RequestContext,
273 next: NextHandler,
274 ) -> BoxFuture<'static, Response> {
275 Box::pin(async move {
276 use axum::body::to_bytes;
277 let resp = next.run(ctx).await;
278 if !resp.status().is_success() {
279 return resp;
280 }
281 let (parts, body) = resp.into_parts();
282 let bytes = to_bytes(body, usize::MAX).await.unwrap_or_default();
283 let inner: serde_json::Value =
284 serde_json::from_slice(&bytes).unwrap_or(serde_json::Value::Null);
285 let wrapped = serde_json::json!({ "data": inner });
286 let body = axum::body::Body::from(serde_json::to_vec(&wrapped).unwrap());
287 Response::from_parts(parts, body)
288 })
289 }
290}