Skip to main content

sozu_command_lib/logging/
access_logs.rs

1use std::{
2    collections::BTreeMap, fmt, fmt::Formatter, mem::ManuallyDrop, net::SocketAddr, time::Duration,
3};
4
5use rusty_ulid::Ulid;
6
7use crate::{
8    logging::{LogLevel, Rfc3339Time},
9    proto::command::{
10        HttpEndpoint, OpenTelemetry as ProtobufOpenTelemetry, ProtobufAccessLog, ProtobufEndpoint,
11        TcpEndpoint, protobuf_endpoint,
12    },
13};
14
15/// This uses unsafe to creates a "fake" owner of the underlying data.
16/// Beware that for the compiler it is as legitimate as the original owner.
17/// So you have to elide one of them (with std::mem::forget or ManuallyDrop)
18/// before it is drop to avoid a double free.
19///
20/// This trait works on &T and Option<&T> types
21///
22/// After performance review, it seems not any more efficient than calling `clone()`,
23/// probably because the cache of malloc is so well optimized these days.
24trait DuplicateOwnership {
25    type Target;
26    /// Don't forget to use std::mem::forget or ManuallyDrop over one of your owners
27    unsafe fn duplicate(self) -> Self::Target;
28}
29
30impl<T> DuplicateOwnership for &T {
31    type Target = T;
32    unsafe fn duplicate(self) -> T {
33        // SAFETY: `std::ptr::read` duplicates the bit-pattern of the
34        // pointee without copying. The result aliases the original value's
35        // ownership view, so the caller MUST ensure exactly one of the two
36        // owners is wrapped in `ManuallyDrop` or `mem::forget`'d before
37        // either is dropped (cf. the type-level guidance at line 26).
38        unsafe { std::ptr::read(self as *const T) }
39    }
40}
41impl<'a, T> DuplicateOwnership for Option<&'a T>
42where
43    T: ?Sized,
44    &'a T: DuplicateOwnership,
45{
46    type Target = Option<<&'a T as DuplicateOwnership>::Target>;
47    unsafe fn duplicate(self) -> Self::Target {
48        // SAFETY: delegates to the inner `&T::duplicate` impl above. The
49        // same double-free obligation propagates to the caller (cf. the
50        // type-level guidance at line 26).
51        unsafe { self.map(|t| t.duplicate()) }
52    }
53}
54impl DuplicateOwnership for &str {
55    type Target = String;
56    unsafe fn duplicate(self) -> Self::Target {
57        // SAFETY: `String::from_raw_parts` reuses the underlying allocation
58        // owned by `self` without copying, so the result aliases that
59        // ownership view. Caller MUST ensure exactly one owner is wrapped
60        // in `ManuallyDrop` or `mem::forget`'d to avoid double-free
61        // (cf. the type-level guidance at line 26). The bytes are valid
62        // UTF-8 because they came from a `&str`.
63        unsafe { String::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len()) }
64    }
65}
66impl<T> DuplicateOwnership for &[T] {
67    type Target = Vec<T>;
68    unsafe fn duplicate(self) -> Self::Target {
69        // SAFETY: `Vec::from_raw_parts` reuses the underlying allocation
70        // owned by `self` without copying, so the result aliases that
71        // ownership view. Caller MUST ensure exactly one owner is wrapped
72        // in `ManuallyDrop` or `mem::forget`'d to avoid double-free
73        // (cf. the type-level guidance at line 26).
74        unsafe { Vec::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len()) }
75    }
76}
77
78pub struct LogMessage<'a>(pub Option<&'a str>);
79pub struct LogDuration(pub Option<Duration>);
80
81/// Prefix block attached to every log line. Rendered as
82/// `[<session_id> <request_id_or_-> <cluster_id_or_-> <backend_id_or_->]`
83/// so operators can grep a full TCP/TLS session (`session_id`) or drill
84/// into one HTTP exchange (`request_id`).
85///
86/// - `session_id` is generated once per mio-accepted socket and survives
87///   protocol upgrades (expect-proxy, TLS handshake, H1↔H2 multiplexing).
88/// - `request_id` is per-request: distinct for each H2 stream, and
89///   regenerated on each H1 keep-alive exchange.
90#[derive(Debug)]
91pub struct LogContext<'a> {
92    pub session_id: Ulid,
93    pub request_id: Option<Ulid>,
94    pub cluster_id: Option<&'a str>,
95    pub backend_id: Option<&'a str>,
96}
97
98pub enum EndpointRecord<'a> {
99    Http {
100        method: Option<&'a str>,
101        authority: Option<&'a str>,
102        path: Option<&'a str>,
103        status: Option<u16>,
104        reason: Option<&'a str>,
105    },
106    Tcp,
107}
108
109/// used to aggregate tags in a session
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct CachedTags {
112    pub tags: BTreeMap<String, String>,
113    pub concatenated: String,
114}
115
116impl CachedTags {
117    pub fn new(tags: BTreeMap<String, String>) -> Self {
118        let concatenated = tags
119            .iter()
120            .map(|(k, v)| format!("{k}={v}"))
121            .collect::<Vec<_>>()
122            .join(", ");
123        Self { tags, concatenated }
124    }
125}
126
127#[derive(Debug)]
128pub struct FullTags<'a> {
129    pub concatenated: Option<&'a str>,
130    pub user_agent: Option<&'a str>,
131}
132
133#[derive(Default)]
134pub struct OpenTelemetry {
135    pub trace_id: [u8; 32],
136    pub span_id: [u8; 16],
137    pub parent_span_id: Option<[u8; 16]>,
138}
139
140impl fmt::Debug for OpenTelemetry {
141    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
142        // SAFETY: `trace_id` is populated either by `parse_hex` (which
143        // verifies every byte is `is_ascii_hexdigit`) or by `random_id`
144        // (which fills from `b"0123456789abcdef"`). Both produce ASCII
145        // bytes, which are by construction valid single-byte UTF-8.
146        let trace_id = unsafe { std::str::from_utf8_unchecked(&self.trace_id) };
147        // SAFETY: same provenance as `trace_id` above — `parse_hex` or
148        // `random_id` from a hex charset, ASCII guaranteed.
149        let span_id = unsafe { std::str::from_utf8_unchecked(&self.span_id) };
150        let parent_span_id = self
151            .parent_span_id
152            .as_ref()
153            // SAFETY: same provenance as `trace_id` above — `parse_hex`
154            // from a hex charset, ASCII guaranteed.
155            .map(|id| unsafe { std::str::from_utf8_unchecked(id) })
156            .unwrap_or("-");
157        write!(f, "{trace_id} {span_id} {parent_span_id}")
158    }
159}
160
161/// Intermediate representation of an access log agnostic of the final format.
162/// Every field is a reference to avoid capturing ownership (as a logger should).
163pub struct RequestRecord<'a> {
164    pub message: Option<&'a str>,
165    pub context: LogContext<'a>,
166    pub session_address: Option<SocketAddr>,
167    pub backend_address: Option<SocketAddr>,
168    pub protocol: &'a str,
169    pub endpoint: EndpointRecord<'a>,
170    pub tags: Option<&'a CachedTags>,
171    pub client_rtt: Option<Duration>,
172    pub server_rtt: Option<Duration>,
173    pub user_agent: Option<&'a str>,
174    /// Value of the `x-request-id` header forwarded to the backend. Preserved
175    /// verbatim when the client supplied one; otherwise derived from the
176    /// request ULID (`context.request_id`). Used by downstream observability
177    /// pipelines as a universal correlation key.
178    pub x_request_id: Option<&'a str>,
179    /// Negotiated TLS protocol version short-form (e.g. `"TLSv1.3"`).
180    /// Static-string borrow plumbed straight from rustls. `None` for
181    /// plaintext listeners or when the version label is unknown.
182    pub tls_version: Option<&'static str>,
183    /// Negotiated TLS cipher suite short-form (e.g.
184    /// `"TLS_AES_128_GCM_SHA256"`). Static-string borrow plumbed straight
185    /// from rustls. `None` for plaintext listeners or when the cipher
186    /// label is unknown.
187    pub tls_cipher: Option<&'static str>,
188    /// TLS Server Name Indication sent by the client at handshake (already
189    /// pre-lowercased, no port). `None` for plaintext listeners or when the
190    /// client omitted the SNI extension.
191    pub tls_sni: Option<&'a str>,
192    /// Negotiated ALPN protocol (e.g. `"h2"`, `"http/1.1"`). `None` for
193    /// plaintext listeners or when no ALPN was negotiated.
194    pub tls_alpn: Option<&'static str>,
195    /// Verbatim value of the client-supplied `X-Forwarded-For` header as
196    /// observed before Sōzu appended its own hop. `None` if the request had
197    /// no `X-Forwarded-For` header.
198    pub xff_chain: Option<&'a str>,
199    pub service_time: Duration,
200    /// time from connecting to the backend until the end of the response
201    pub response_time: Option<Duration>,
202    /// time between first byte of the request and last byte of the response
203    pub request_time: Duration,
204    pub bytes_in: usize,
205    pub bytes_out: usize,
206    pub otel: Option<&'a OpenTelemetry>,
207
208    // added by the logger itself
209    pub pid: i32,
210    pub tag: &'a str,
211    pub level: LogLevel,
212    pub now: Rfc3339Time,
213    pub precise_time: i128,
214}
215
216impl RequestRecord<'_> {
217    pub fn full_tags(&self) -> FullTags<'_> {
218        FullTags {
219            concatenated: self.tags.as_ref().map(|t| t.concatenated.as_str()),
220            user_agent: self.user_agent,
221        }
222    }
223
224    /// Converts the RequestRecord in its protobuf representation.
225    /// Prost needs ownership over all the fields but we don't want to take it from the user
226    /// or clone them, so we use the unsafe DuplicateOwnership.
227    pub fn into_binary_access_log(self) -> ManuallyDrop<ProtobufAccessLog> {
228        // SAFETY: Each `.duplicate()` call below borrows ownership without
229        // copying. The whole protobuf is wrapped in `ManuallyDrop` so its
230        // destructor never runs — the original `RequestRecord` references
231        // remain the sole owners. `std::str::from_utf8_unchecked` on `otel`
232        // fields is sound because `trace_id` / `span_id` are ASCII hex
233        // (see the `OpenTelemetry` Debug impl above for the same proof).
234        unsafe {
235            let endpoint = match self.endpoint {
236                EndpointRecord::Http {
237                    method,
238                    authority,
239                    path,
240                    status,
241                    reason,
242                } => protobuf_endpoint::Inner::Http(HttpEndpoint {
243                    method: method.duplicate(),
244                    authority: authority.duplicate(),
245                    path: path.duplicate(),
246                    status: status.map(|s| s as u32),
247                    reason: reason.duplicate(),
248                }),
249                EndpointRecord::Tcp => protobuf_endpoint::Inner::Tcp(TcpEndpoint {}),
250            };
251
252            ManuallyDrop::new(ProtobufAccessLog {
253                backend_address: self.backend_address.map(Into::into),
254                backend_id: self.context.backend_id.duplicate(),
255                bytes_in: self.bytes_in as u64,
256                bytes_out: self.bytes_out as u64,
257                client_rtt: self.client_rtt.map(|t| t.as_micros() as u64),
258                cluster_id: self.context.cluster_id.duplicate(),
259                endpoint: ProtobufEndpoint {
260                    inner: Some(endpoint),
261                },
262                message: self.message.duplicate(),
263                protocol: self.protocol.duplicate(),
264                request_id: self
265                    .context
266                    .request_id
267                    .unwrap_or(self.context.session_id)
268                    .into(),
269                session_id: Some(self.context.session_id.into()),
270                response_time: self.response_time.map(|t| t.as_micros() as u64),
271                server_rtt: self.server_rtt.map(|t| t.as_micros() as u64),
272                service_time: self.service_time.as_micros() as u64,
273                session_address: self.session_address.map(Into::into),
274                tags: self
275                    .tags
276                    .map(|tags| tags.tags.duplicate())
277                    .unwrap_or_default(),
278                user_agent: self.user_agent.duplicate(),
279                x_request_id: self.x_request_id.duplicate(),
280                tls_version: self.tls_version.duplicate(),
281                tls_cipher: self.tls_cipher.duplicate(),
282                tls_sni: self.tls_sni.duplicate(),
283                tls_alpn: self.tls_alpn.duplicate(),
284                xff_chain: self.xff_chain.duplicate(),
285                tag: self.tag.duplicate(),
286                time: self.precise_time.into(),
287                request_time: Some(self.request_time.as_micros() as u64),
288                otel: self.otel.map(|otel| ProtobufOpenTelemetry {
289                    trace_id: std::str::from_utf8_unchecked(&otel.trace_id).duplicate(),
290                    span_id: std::str::from_utf8_unchecked(&otel.span_id).duplicate(),
291                    parent_span_id: otel
292                        .parent_span_id
293                        .as_ref()
294                        .map(|id| std::str::from_utf8_unchecked(id).duplicate()),
295                }),
296            })
297        }
298    }
299}