Skip to main content

api_gateway/middleware/
access_log.rs

1//! Structured HTTP access log middleware.
2//!
3//! Emits one `tracing::info!` event per completed request with the
4//! following fields:
5//! `pid`, `request_id`, `trace_id`, `method`, `uri`, `remote_addr`,
6//! `remote_addr_ip`, `remote_addr_port`, `content_length`, `user_agent`,
7//! `duration_ms`, `duration` (µs), `status`, `bytes_sent`.
8
9use std::net::SocketAddr;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13use axum::{body::Body, extract::ConnectInfo, middleware::Next, response::Response};
14use bytes::Bytes;
15use http_body::Frame;
16
17use super::request_id::XRequestId;
18
19/// Middleware that emits a structured access log line for every HTTP request.
20///
21/// Must be placed **inside** the `TraceLayer` span (so the event inherits
22/// trace context) and **outside** business middleware (auth, rate-limit, etc.)
23/// so that the logged status reflects all middleware processing.
24///
25/// The log is emitted once the response body has been fully streamed (or
26/// dropped), so `bytes_sent` reflects actual bytes written — including
27/// chunked-transfer and SSE responses that lack a `Content-Length` header.
28pub async fn access_log_middleware(req: axum::extract::Request, next: Next) -> Response {
29    let start = std::time::Instant::now();
30
31    // --- Request-phase data capture ---
32
33    let method = req.method().to_string();
34    let uri = req.uri().path_and_query().map_or_else(
35        || req.uri().path().to_owned(),
36        std::string::ToString::to_string,
37    );
38
39    let content_length: u64 = req
40        .headers()
41        .get(axum::http::header::CONTENT_LENGTH)
42        .and_then(|v| v.to_str().ok())
43        .and_then(|s| s.parse().ok())
44        .unwrap_or(0);
45
46    let user_agent = req
47        .headers()
48        .get(axum::http::header::USER_AGENT)
49        .and_then(|v| v.to_str().ok())
50        .unwrap_or("")
51        .to_owned();
52
53    let request_id = req
54        .extensions()
55        .get::<XRequestId>()
56        .map_or_else(String::new, |x| x.0.clone());
57
58    let trace_id = req
59        .headers()
60        .get(modkit_http::otel::TRACEPARENT)
61        .and_then(|v| v.to_str().ok())
62        .and_then(modkit_http::otel::parse_trace_id)
63        .unwrap_or_default();
64
65    let (remote_addr, remote_addr_ip, remote_addr_port) = req
66        .extensions()
67        .get::<ConnectInfo<SocketAddr>>()
68        .map(|ci| {
69            let addr = ci.0;
70            (addr.to_string(), addr.ip().to_string(), addr.port())
71        })
72        .unwrap_or_default();
73
74    // --- Invoke downstream ---
75
76    let response = next.run(req).await;
77
78    // --- Response-phase data ---
79
80    let status = response.status().as_u16();
81
82    // Wrap the response body in a counting wrapper so that `bytes_sent`
83    // reflects the actual number of bytes streamed to the client —
84    // including chunked-transfer and SSE responses that have no
85    // Content-Length header.
86    let log_ctx = AccessLogContext {
87        start,
88        pid: std::process::id(),
89        request_id,
90        trace_id,
91        method,
92        uri,
93        remote_addr,
94        remote_addr_ip,
95        remote_addr_port,
96        content_length,
97        user_agent,
98        status,
99    };
100
101    let (parts, body) = response.into_parts();
102    let counting_body = CountingBody {
103        inner: body,
104        bytes_sent: 0,
105        log_ctx: Some(log_ctx),
106    };
107    Response::from_parts(parts, Body::new(counting_body))
108}
109
110/// All data needed to emit the access log once the body completes.
111struct AccessLogContext {
112    start: std::time::Instant,
113    pid: u32,
114    request_id: String,
115    trace_id: String,
116    method: String,
117    uri: String,
118    remote_addr: String,
119    remote_addr_ip: String,
120    remote_addr_port: u16,
121    content_length: u64,
122    user_agent: String,
123    status: u16,
124}
125
126impl AccessLogContext {
127    fn emit(self, bytes_sent: u64) {
128        let elapsed = self.start.elapsed();
129        let duration_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX);
130        let duration_micros = u64::try_from(elapsed.as_micros()).unwrap_or(u64::MAX);
131
132        tracing::info!(
133            target: "access_log",
134            msg = "response completed",
135            pid = self.pid,
136            request_id = %self.request_id,
137            trace_id = %self.trace_id,
138            method = %self.method,
139            uri = %self.uri,
140            remote_addr = %self.remote_addr,
141            remote_addr_ip = %self.remote_addr_ip,
142            remote_addr_port = self.remote_addr_port,
143            content_length = self.content_length,
144            user_agent = %self.user_agent,
145            duration_ms = duration_ms,
146            duration = duration_micros,
147            status = self.status,
148            bytes_sent = bytes_sent,
149        );
150    }
151}
152
153/// A body wrapper that counts bytes as frames are streamed, then emits
154/// the access log once the body is fully consumed or dropped.
155struct CountingBody {
156    inner: Body,
157    bytes_sent: u64,
158    log_ctx: Option<AccessLogContext>,
159}
160
161impl http_body::Body for CountingBody {
162    type Data = Bytes;
163    type Error = axum::Error;
164
165    fn poll_frame(
166        self: Pin<&mut Self>,
167        cx: &mut Context<'_>,
168    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
169        let this = self.get_mut();
170        let inner = Pin::new(&mut this.inner);
171
172        match inner.poll_frame(cx) {
173            Poll::Pending => Poll::Pending,
174            Poll::Ready(None) => {
175                // Body stream finished — emit the access log.
176                if let Some(ctx) = this.log_ctx.take() {
177                    ctx.emit(this.bytes_sent);
178                }
179                Poll::Ready(None)
180            }
181            Poll::Ready(Some(Ok(frame))) => {
182                if let Some(data) = frame.data_ref() {
183                    this.bytes_sent = this.bytes_sent.saturating_add(data.len() as u64);
184                }
185                Poll::Ready(Some(Ok(frame)))
186            }
187            Poll::Ready(Some(Err(e))) => {
188                // Body stream errored — emit the log now rather than
189                // deferring to Drop so timing is accurate.
190                if let Some(ctx) = this.log_ctx.take() {
191                    ctx.emit(this.bytes_sent);
192                }
193                Poll::Ready(Some(Err(e)))
194            }
195        }
196    }
197
198    fn is_end_stream(&self) -> bool {
199        self.inner.is_end_stream()
200    }
201
202    fn size_hint(&self) -> http_body::SizeHint {
203        self.inner.size_hint()
204    }
205}
206
207impl Drop for CountingBody {
208    fn drop(&mut self) {
209        // If the body is dropped before the stream completes (e.g. client
210        // disconnect), still emit the access log with whatever we counted.
211        if let Some(ctx) = self.log_ctx.take() {
212            ctx.emit(self.bytes_sent);
213        }
214    }
215}