Skip to main content

tako_rs_plugins/middleware/
access_log.rs

1//! Structured access log middleware.
2//!
3//! Emits one log line per request after the response is produced, separate
4//! from the metrics signal pipeline so operators can keep request-level audit
5//! trails even when metrics are disabled.
6//!
7//! Default sink writes through the `tracing` macros at INFO level
8//! (`target = "tako::access"`). Plug a custom sink via [`AccessLog::sink`] for
9//! JSON / OTLP / file rotation.
10//!
11//! Fields per record:
12//!
13//! - `method`, `path`, `version`
14//! - `status` (numeric)
15//! - `duration_us` (microseconds)
16//! - `request_id` if a [`RequestIdValue`] extension is present
17//! - `peer` (ip / unix / other) if a [`ConnInfo`] extension is present
18
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::time::Instant;
23
24use tako_rs_core::conn_info::ConnInfo;
25use tako_rs_core::conn_info::PeerAddr;
26use tako_rs_core::middleware::IntoMiddleware;
27use tako_rs_core::middleware::Next;
28use tako_rs_core::types::Request;
29use tako_rs_core::types::Response;
30
31use super::request_id::RequestIdValue;
32
33/// Single access-log record handed to the sink.
34#[derive(Debug, Clone)]
35pub struct AccessRecord {
36  pub method: String,
37  pub path: String,
38  pub version: String,
39  pub status: u16,
40  pub duration_us: u64,
41  pub request_id: Option<String>,
42  pub peer: Option<String>,
43}
44
45type SinkFn = Arc<dyn Fn(AccessRecord) + Send + Sync + 'static>;
46
47/// Access log middleware.
48pub struct AccessLog {
49  sink: SinkFn,
50}
51
52impl Default for AccessLog {
53  fn default() -> Self {
54    Self::new()
55  }
56}
57
58impl AccessLog {
59  /// Creates an access log middleware that writes through `tracing` at INFO.
60  pub fn new() -> Self {
61    Self {
62      sink: Arc::new(|rec: AccessRecord| {
63        tracing::info!(
64          target: "tako::access",
65          method = %rec.method,
66          path = %rec.path,
67          version = %rec.version,
68          status = rec.status,
69          duration_us = rec.duration_us,
70          request_id = rec.request_id.as_deref(),
71          peer = rec.peer.as_deref(),
72          "access",
73        );
74      }),
75    }
76  }
77
78  /// Replaces the default `tracing` sink with a custom one (JSON exporter,
79  /// async channel, file rotation, …).
80  pub fn sink<F>(mut self, f: F) -> Self
81  where
82    F: Fn(AccessRecord) + Send + Sync + 'static,
83  {
84    self.sink = Arc::new(f);
85    self
86  }
87}
88
89fn peer_label(info: &ConnInfo) -> String {
90  match &info.peer {
91    PeerAddr::Ip(sa) => sa.to_string(),
92    PeerAddr::Unix(Some(p)) => format!("unix:{}", p.display()),
93    PeerAddr::Unix(None) => "unix:?".to_string(),
94    PeerAddr::Other(s) => s.clone(),
95  }
96}
97
98impl IntoMiddleware for AccessLog {
99  fn into_middleware(
100    self,
101  ) -> impl Fn(Request, Next) -> Pin<Box<dyn Future<Output = Response> + Send + 'static>>
102  + Clone
103  + Send
104  + Sync
105  + 'static {
106    let sink = self.sink;
107
108    move |req: Request, next: Next| {
109      let sink = sink.clone();
110      Box::pin(async move {
111        let started = Instant::now();
112        let method = req.method().to_string();
113        let path = req.uri().path().to_string();
114        let version = format!("{:?}", req.version());
115        let request_id = req
116          .extensions()
117          .get::<RequestIdValue>()
118          .map(|v| v.0.clone());
119        let peer = req.extensions().get::<ConnInfo>().map(peer_label);
120
121        let resp = next.run(req).await;
122        let elapsed = started.elapsed();
123        let rec = AccessRecord {
124          method,
125          path,
126          version,
127          status: resp.status().as_u16(),
128          duration_us: elapsed.as_micros().min(u128::from(u64::MAX)) as u64,
129          request_id,
130          peer,
131        };
132        sink(rec);
133        resp
134      })
135    }
136  }
137}