api_gateway/middleware/
access_log.rs1use std::net::SocketAddr;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13use axum::{body::Body, extract::ConnectInfo, middleware::Next, response::Response};
14use bytes::Bytes;
15use http_body::Frame;
16
17use super::request_id::XRequestId;
18
19pub async fn access_log_middleware(req: axum::extract::Request, next: Next) -> Response {
29 let start = std::time::Instant::now();
30
31 let method = req.method().to_string();
34 let uri = req.uri().path_and_query().map_or_else(
35 || req.uri().path().to_owned(),
36 std::string::ToString::to_string,
37 );
38
39 let content_length: u64 = req
40 .headers()
41 .get(axum::http::header::CONTENT_LENGTH)
42 .and_then(|v| v.to_str().ok())
43 .and_then(|s| s.parse().ok())
44 .unwrap_or(0);
45
46 let user_agent = req
47 .headers()
48 .get(axum::http::header::USER_AGENT)
49 .and_then(|v| v.to_str().ok())
50 .unwrap_or("")
51 .to_owned();
52
53 let request_id = req
54 .extensions()
55 .get::<XRequestId>()
56 .map_or_else(String::new, |x| x.0.clone());
57
58 let trace_id = req
59 .headers()
60 .get(modkit_http::otel::TRACEPARENT)
61 .and_then(|v| v.to_str().ok())
62 .and_then(modkit_http::otel::parse_trace_id)
63 .unwrap_or_default();
64
65 let (remote_addr, remote_addr_ip, remote_addr_port) = req
66 .extensions()
67 .get::<ConnectInfo<SocketAddr>>()
68 .map(|ci| {
69 let addr = ci.0;
70 (addr.to_string(), addr.ip().to_string(), addr.port())
71 })
72 .unwrap_or_default();
73
74 let response = next.run(req).await;
77
78 let status = response.status().as_u16();
81
82 let log_ctx = AccessLogContext {
87 start,
88 pid: std::process::id(),
89 request_id,
90 trace_id,
91 method,
92 uri,
93 remote_addr,
94 remote_addr_ip,
95 remote_addr_port,
96 content_length,
97 user_agent,
98 status,
99 };
100
101 let (parts, body) = response.into_parts();
102 let counting_body = CountingBody {
103 inner: body,
104 bytes_sent: 0,
105 log_ctx: Some(log_ctx),
106 };
107 Response::from_parts(parts, Body::new(counting_body))
108}
109
110struct AccessLogContext {
112 start: std::time::Instant,
113 pid: u32,
114 request_id: String,
115 trace_id: String,
116 method: String,
117 uri: String,
118 remote_addr: String,
119 remote_addr_ip: String,
120 remote_addr_port: u16,
121 content_length: u64,
122 user_agent: String,
123 status: u16,
124}
125
126impl AccessLogContext {
127 fn emit(self, bytes_sent: u64) {
128 let elapsed = self.start.elapsed();
129 let duration_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX);
130 let duration_micros = u64::try_from(elapsed.as_micros()).unwrap_or(u64::MAX);
131
132 tracing::info!(
133 target: "access_log",
134 msg = "response completed",
135 pid = self.pid,
136 request_id = %self.request_id,
137 trace_id = %self.trace_id,
138 method = %self.method,
139 uri = %self.uri,
140 remote_addr = %self.remote_addr,
141 remote_addr_ip = %self.remote_addr_ip,
142 remote_addr_port = self.remote_addr_port,
143 content_length = self.content_length,
144 user_agent = %self.user_agent,
145 duration_ms = duration_ms,
146 duration = duration_micros,
147 status = self.status,
148 bytes_sent = bytes_sent,
149 );
150 }
151}
152
153struct CountingBody {
156 inner: Body,
157 bytes_sent: u64,
158 log_ctx: Option<AccessLogContext>,
159}
160
161impl http_body::Body for CountingBody {
162 type Data = Bytes;
163 type Error = axum::Error;
164
165 fn poll_frame(
166 self: Pin<&mut Self>,
167 cx: &mut Context<'_>,
168 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
169 let this = self.get_mut();
170 let inner = Pin::new(&mut this.inner);
171
172 match inner.poll_frame(cx) {
173 Poll::Pending => Poll::Pending,
174 Poll::Ready(None) => {
175 if let Some(ctx) = this.log_ctx.take() {
177 ctx.emit(this.bytes_sent);
178 }
179 Poll::Ready(None)
180 }
181 Poll::Ready(Some(Ok(frame))) => {
182 if let Some(data) = frame.data_ref() {
183 this.bytes_sent = this.bytes_sent.saturating_add(data.len() as u64);
184 }
185 Poll::Ready(Some(Ok(frame)))
186 }
187 Poll::Ready(Some(Err(e))) => {
188 if let Some(ctx) = this.log_ctx.take() {
191 ctx.emit(this.bytes_sent);
192 }
193 Poll::Ready(Some(Err(e)))
194 }
195 }
196 }
197
198 fn is_end_stream(&self) -> bool {
199 self.inner.is_end_stream()
200 }
201
202 fn size_hint(&self) -> http_body::SizeHint {
203 self.inner.size_hint()
204 }
205}
206
207impl Drop for CountingBody {
208 fn drop(&mut self) {
209 if let Some(ctx) = self.log_ctx.take() {
212 ctx.emit(self.bytes_sent);
213 }
214 }
215}