tako_rs_plugins/middleware/
access_log.rs1use std::future::Future;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::time::Instant;
23
24use tako_rs_core::conn_info::ConnInfo;
25use tako_rs_core::conn_info::PeerAddr;
26use tako_rs_core::middleware::IntoMiddleware;
27use tako_rs_core::middleware::Next;
28use tako_rs_core::types::Request;
29use tako_rs_core::types::Response;
30
31use super::request_id::RequestIdValue;
32
33#[derive(Debug, Clone)]
35pub struct AccessRecord {
36 pub method: String,
37 pub path: String,
38 pub version: String,
39 pub status: u16,
40 pub duration_us: u64,
41 pub request_id: Option<String>,
42 pub peer: Option<String>,
43}
44
45type SinkFn = Arc<dyn Fn(AccessRecord) + Send + Sync + 'static>;
46
47pub struct AccessLog {
49 sink: SinkFn,
50}
51
52impl Default for AccessLog {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl AccessLog {
59 pub fn new() -> Self {
61 Self {
62 sink: Arc::new(|rec: AccessRecord| {
63 tracing::info!(
64 target: "tako::access",
65 method = %rec.method,
66 path = %rec.path,
67 version = %rec.version,
68 status = rec.status,
69 duration_us = rec.duration_us,
70 request_id = rec.request_id.as_deref(),
71 peer = rec.peer.as_deref(),
72 "access",
73 );
74 }),
75 }
76 }
77
78 pub fn sink<F>(mut self, f: F) -> Self
81 where
82 F: Fn(AccessRecord) + Send + Sync + 'static,
83 {
84 self.sink = Arc::new(f);
85 self
86 }
87}
88
89fn peer_label(info: &ConnInfo) -> String {
90 match &info.peer {
91 PeerAddr::Ip(sa) => sa.to_string(),
92 PeerAddr::Unix(Some(p)) => format!("unix:{}", p.display()),
93 PeerAddr::Unix(None) => "unix:?".to_string(),
94 PeerAddr::Other(s) => s.clone(),
95 }
96}
97
98impl IntoMiddleware for AccessLog {
99 fn into_middleware(
100 self,
101 ) -> impl Fn(Request, Next) -> Pin<Box<dyn Future<Output = Response> + Send + 'static>>
102 + Clone
103 + Send
104 + Sync
105 + 'static {
106 let sink = self.sink;
107
108 move |req: Request, next: Next| {
109 let sink = sink.clone();
110 Box::pin(async move {
111 let started = Instant::now();
112 let method = req.method().to_string();
113 let path = req.uri().path().to_string();
114 let version = format!("{:?}", req.version());
115 let request_id = req
116 .extensions()
117 .get::<RequestIdValue>()
118 .map(|v| v.0.clone());
119 let peer = req.extensions().get::<ConnInfo>().map(peer_label);
120
121 let resp = next.run(req).await;
122 let elapsed = started.elapsed();
123 let rec = AccessRecord {
124 method,
125 path,
126 version,
127 status: resp.status().as_u16(),
128 duration_us: elapsed.as_micros().min(u128::from(u64::MAX)) as u64,
129 request_id,
130 peer,
131 };
132 sink(rec);
133 resp
134 })
135 }
136 }
137}