sozu_command_lib/logging/access_logs.rs
1use std::{
2 collections::BTreeMap, fmt, fmt::Formatter, mem::ManuallyDrop, net::SocketAddr, time::Duration,
3};
4
5use rusty_ulid::Ulid;
6
7use crate::{
8 logging::{LogLevel, Rfc3339Time},
9 proto::command::{
10 HttpEndpoint, OpenTelemetry as ProtobufOpenTelemetry, ProtobufAccessLog, ProtobufEndpoint,
11 TcpEndpoint, protobuf_endpoint,
12 },
13};
14
15/// This uses unsafe to creates a "fake" owner of the underlying data.
16/// Beware that for the compiler it is as legitimate as the original owner.
17/// So you have to elide one of them (with std::mem::forget or ManuallyDrop)
18/// before it is drop to avoid a double free.
19///
20/// This trait works on &T and Option<&T> types
21///
22/// After performance review, it seems not any more efficient than calling `clone()`,
23/// probably because the cache of malloc is so well optimized these days.
24trait DuplicateOwnership {
25 type Target;
26 /// Don't forget to use std::mem::forget or ManuallyDrop over one of your owners
27 unsafe fn duplicate(self) -> Self::Target;
28}
29
30impl<T> DuplicateOwnership for &T {
31 type Target = T;
32 unsafe fn duplicate(self) -> T {
33 // SAFETY: `std::ptr::read` duplicates the bit-pattern of the
34 // pointee without copying. The result aliases the original value's
35 // ownership view, so the caller MUST ensure exactly one of the two
36 // owners is wrapped in `ManuallyDrop` or `mem::forget`'d before
37 // either is dropped (cf. the type-level guidance at line 26).
38 unsafe { std::ptr::read(self as *const T) }
39 }
40}
41impl<'a, T> DuplicateOwnership for Option<&'a T>
42where
43 T: ?Sized,
44 &'a T: DuplicateOwnership,
45{
46 type Target = Option<<&'a T as DuplicateOwnership>::Target>;
47 unsafe fn duplicate(self) -> Self::Target {
48 // SAFETY: delegates to the inner `&T::duplicate` impl above. The
49 // same double-free obligation propagates to the caller (cf. the
50 // type-level guidance at line 26).
51 unsafe { self.map(|t| t.duplicate()) }
52 }
53}
54impl DuplicateOwnership for &str {
55 type Target = String;
56 unsafe fn duplicate(self) -> Self::Target {
57 // SAFETY: `String::from_raw_parts` reuses the underlying allocation
58 // owned by `self` without copying, so the result aliases that
59 // ownership view. Caller MUST ensure exactly one owner is wrapped
60 // in `ManuallyDrop` or `mem::forget`'d to avoid double-free
61 // (cf. the type-level guidance at line 26). The bytes are valid
62 // UTF-8 because they came from a `&str`.
63 unsafe { String::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len()) }
64 }
65}
66impl<T> DuplicateOwnership for &[T] {
67 type Target = Vec<T>;
68 unsafe fn duplicate(self) -> Self::Target {
69 // SAFETY: `Vec::from_raw_parts` reuses the underlying allocation
70 // owned by `self` without copying, so the result aliases that
71 // ownership view. Caller MUST ensure exactly one owner is wrapped
72 // in `ManuallyDrop` or `mem::forget`'d to avoid double-free
73 // (cf. the type-level guidance at line 26).
74 unsafe { Vec::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len()) }
75 }
76}
77
78pub struct LogMessage<'a>(pub Option<&'a str>);
79pub struct LogDuration(pub Option<Duration>);
80
81/// Prefix block attached to every log line. Rendered as
82/// `[<session_id> <request_id_or_-> <cluster_id_or_-> <backend_id_or_->]`
83/// so operators can grep a full TCP/TLS session (`session_id`) or drill
84/// into one HTTP exchange (`request_id`).
85///
86/// - `session_id` is generated once per mio-accepted socket and survives
87/// protocol upgrades (expect-proxy, TLS handshake, H1↔H2 multiplexing).
88/// - `request_id` is per-request: distinct for each H2 stream, and
89/// regenerated on each H1 keep-alive exchange.
90#[derive(Debug)]
91pub struct LogContext<'a> {
92 pub session_id: Ulid,
93 pub request_id: Option<Ulid>,
94 pub cluster_id: Option<&'a str>,
95 pub backend_id: Option<&'a str>,
96}
97
98pub enum EndpointRecord<'a> {
99 Http {
100 method: Option<&'a str>,
101 authority: Option<&'a str>,
102 path: Option<&'a str>,
103 status: Option<u16>,
104 reason: Option<&'a str>,
105 },
106 Tcp,
107}
108
109/// used to aggregate tags in a session
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct CachedTags {
112 pub tags: BTreeMap<String, String>,
113 pub concatenated: String,
114}
115
116impl CachedTags {
117 pub fn new(tags: BTreeMap<String, String>) -> Self {
118 let concatenated = tags
119 .iter()
120 .map(|(k, v)| format!("{k}={v}"))
121 .collect::<Vec<_>>()
122 .join(", ");
123 Self { tags, concatenated }
124 }
125}
126
127#[derive(Debug)]
128pub struct FullTags<'a> {
129 pub concatenated: Option<&'a str>,
130 pub user_agent: Option<&'a str>,
131}
132
133#[derive(Default)]
134pub struct OpenTelemetry {
135 pub trace_id: [u8; 32],
136 pub span_id: [u8; 16],
137 pub parent_span_id: Option<[u8; 16]>,
138}
139
140impl fmt::Debug for OpenTelemetry {
141 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
142 // SAFETY: `trace_id` is populated either by `parse_hex` (which
143 // verifies every byte is `is_ascii_hexdigit`) or by `random_id`
144 // (which fills from `b"0123456789abcdef"`). Both produce ASCII
145 // bytes, which are by construction valid single-byte UTF-8.
146 let trace_id = unsafe { std::str::from_utf8_unchecked(&self.trace_id) };
147 // SAFETY: same provenance as `trace_id` above — `parse_hex` or
148 // `random_id` from a hex charset, ASCII guaranteed.
149 let span_id = unsafe { std::str::from_utf8_unchecked(&self.span_id) };
150 let parent_span_id = self
151 .parent_span_id
152 .as_ref()
153 // SAFETY: same provenance as `trace_id` above — `parse_hex`
154 // from a hex charset, ASCII guaranteed.
155 .map(|id| unsafe { std::str::from_utf8_unchecked(id) })
156 .unwrap_or("-");
157 write!(f, "{trace_id} {span_id} {parent_span_id}")
158 }
159}
160
161/// Intermediate representation of an access log agnostic of the final format.
162/// Every field is a reference to avoid capturing ownership (as a logger should).
163pub struct RequestRecord<'a> {
164 pub message: Option<&'a str>,
165 pub context: LogContext<'a>,
166 pub session_address: Option<SocketAddr>,
167 pub backend_address: Option<SocketAddr>,
168 pub protocol: &'a str,
169 pub endpoint: EndpointRecord<'a>,
170 pub tags: Option<&'a CachedTags>,
171 pub client_rtt: Option<Duration>,
172 pub server_rtt: Option<Duration>,
173 pub user_agent: Option<&'a str>,
174 /// Value of the `x-request-id` header forwarded to the backend. Preserved
175 /// verbatim when the client supplied one; otherwise derived from the
176 /// request ULID (`context.request_id`). Used by downstream observability
177 /// pipelines as a universal correlation key.
178 pub x_request_id: Option<&'a str>,
179 /// Negotiated TLS protocol version short-form (e.g. `"TLSv1.3"`).
180 /// Static-string borrow plumbed straight from rustls. `None` for
181 /// plaintext listeners or when the version label is unknown.
182 pub tls_version: Option<&'static str>,
183 /// Negotiated TLS cipher suite short-form (e.g.
184 /// `"TLS_AES_128_GCM_SHA256"`). Static-string borrow plumbed straight
185 /// from rustls. `None` for plaintext listeners or when the cipher
186 /// label is unknown.
187 pub tls_cipher: Option<&'static str>,
188 /// TLS Server Name Indication sent by the client at handshake (already
189 /// pre-lowercased, no port). `None` for plaintext listeners or when the
190 /// client omitted the SNI extension.
191 pub tls_sni: Option<&'a str>,
192 /// Negotiated ALPN protocol (e.g. `"h2"`, `"http/1.1"`). `None` for
193 /// plaintext listeners or when no ALPN was negotiated.
194 pub tls_alpn: Option<&'static str>,
195 /// Verbatim value of the client-supplied `X-Forwarded-For` header as
196 /// observed before Sōzu appended its own hop. `None` if the request had
197 /// no `X-Forwarded-For` header.
198 pub xff_chain: Option<&'a str>,
199 pub service_time: Duration,
200 /// time from connecting to the backend until the end of the response
201 pub response_time: Option<Duration>,
202 /// time between first byte of the request and last byte of the response
203 pub request_time: Duration,
204 pub bytes_in: usize,
205 pub bytes_out: usize,
206 pub otel: Option<&'a OpenTelemetry>,
207
208 // added by the logger itself
209 pub pid: i32,
210 pub tag: &'a str,
211 pub level: LogLevel,
212 pub now: Rfc3339Time,
213 pub precise_time: i128,
214}
215
216impl RequestRecord<'_> {
217 pub fn full_tags(&self) -> FullTags<'_> {
218 FullTags {
219 concatenated: self.tags.as_ref().map(|t| t.concatenated.as_str()),
220 user_agent: self.user_agent,
221 }
222 }
223
224 /// Converts the RequestRecord in its protobuf representation.
225 /// Prost needs ownership over all the fields but we don't want to take it from the user
226 /// or clone them, so we use the unsafe DuplicateOwnership.
227 pub fn into_binary_access_log(self) -> ManuallyDrop<ProtobufAccessLog> {
228 // SAFETY: Each `.duplicate()` call below borrows ownership without
229 // copying. The whole protobuf is wrapped in `ManuallyDrop` so its
230 // destructor never runs — the original `RequestRecord` references
231 // remain the sole owners. `std::str::from_utf8_unchecked` on `otel`
232 // fields is sound because `trace_id` / `span_id` are ASCII hex
233 // (see the `OpenTelemetry` Debug impl above for the same proof).
234 unsafe {
235 let endpoint = match self.endpoint {
236 EndpointRecord::Http {
237 method,
238 authority,
239 path,
240 status,
241 reason,
242 } => protobuf_endpoint::Inner::Http(HttpEndpoint {
243 method: method.duplicate(),
244 authority: authority.duplicate(),
245 path: path.duplicate(),
246 status: status.map(|s| s as u32),
247 reason: reason.duplicate(),
248 }),
249 EndpointRecord::Tcp => protobuf_endpoint::Inner::Tcp(TcpEndpoint {}),
250 };
251
252 ManuallyDrop::new(ProtobufAccessLog {
253 backend_address: self.backend_address.map(Into::into),
254 backend_id: self.context.backend_id.duplicate(),
255 bytes_in: self.bytes_in as u64,
256 bytes_out: self.bytes_out as u64,
257 client_rtt: self.client_rtt.map(|t| t.as_micros() as u64),
258 cluster_id: self.context.cluster_id.duplicate(),
259 endpoint: ProtobufEndpoint {
260 inner: Some(endpoint),
261 },
262 message: self.message.duplicate(),
263 protocol: self.protocol.duplicate(),
264 request_id: self
265 .context
266 .request_id
267 .unwrap_or(self.context.session_id)
268 .into(),
269 session_id: Some(self.context.session_id.into()),
270 response_time: self.response_time.map(|t| t.as_micros() as u64),
271 server_rtt: self.server_rtt.map(|t| t.as_micros() as u64),
272 service_time: self.service_time.as_micros() as u64,
273 session_address: self.session_address.map(Into::into),
274 tags: self
275 .tags
276 .map(|tags| tags.tags.duplicate())
277 .unwrap_or_default(),
278 user_agent: self.user_agent.duplicate(),
279 x_request_id: self.x_request_id.duplicate(),
280 tls_version: self.tls_version.duplicate(),
281 tls_cipher: self.tls_cipher.duplicate(),
282 tls_sni: self.tls_sni.duplicate(),
283 tls_alpn: self.tls_alpn.duplicate(),
284 xff_chain: self.xff_chain.duplicate(),
285 tag: self.tag.duplicate(),
286 time: self.precise_time.into(),
287 request_time: Some(self.request_time.as_micros() as u64),
288 otel: self.otel.map(|otel| ProtobufOpenTelemetry {
289 trace_id: std::str::from_utf8_unchecked(&otel.trace_id).duplicate(),
290 span_id: std::str::from_utf8_unchecked(&otel.span_id).duplicate(),
291 parent_span_id: otel
292 .parent_span_id
293 .as_ref()
294 .map(|id| std::str::from_utf8_unchecked(id).duplicate()),
295 }),
296 })
297 }
298 }
299}