sozu_command_lib/logging/
access_logs.rs1use std::{
2 collections::BTreeMap, fmt, fmt::Formatter, mem::ManuallyDrop, net::SocketAddr, time::Duration,
3};
4
5use rusty_ulid::Ulid;
6
7use crate::{
8 logging::{LogLevel, Rfc3339Time},
9 proto::command::{
10 HttpEndpoint, OpenTelemetry as ProtobufOpenTelemetry, ProtobufAccessLog, ProtobufEndpoint,
11 TcpEndpoint, protobuf_endpoint,
12 },
13};
14
15trait DuplicateOwnership {
25 type Target;
26 unsafe fn duplicate(self) -> Self::Target;
28}
29
30impl<T> DuplicateOwnership for &T {
31 type Target = T;
32 unsafe fn duplicate(self) -> T {
33 unsafe { std::ptr::read(self as *const T) }
34 }
35}
36impl<'a, T> DuplicateOwnership for Option<&'a T>
37where
38 T: ?Sized,
39 &'a T: DuplicateOwnership,
40{
41 type Target = Option<<&'a T as DuplicateOwnership>::Target>;
42 unsafe fn duplicate(self) -> Self::Target {
43 unsafe { self.map(|t| t.duplicate()) }
44 }
45}
46impl DuplicateOwnership for &str {
47 type Target = String;
48 unsafe fn duplicate(self) -> Self::Target {
49 unsafe { String::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len()) }
50 }
51}
52impl<T> DuplicateOwnership for &[T] {
53 type Target = Vec<T>;
54 unsafe fn duplicate(self) -> Self::Target {
55 unsafe { Vec::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len()) }
56 }
57}
58
59pub struct LogMessage<'a>(pub Option<&'a str>);
60pub struct LogDuration(pub Option<Duration>);
61
62#[derive(Debug)]
63pub struct LogContext<'a> {
64 pub request_id: Ulid,
65 pub cluster_id: Option<&'a str>,
66 pub backend_id: Option<&'a str>,
67}
68
69pub enum EndpointRecord<'a> {
70 Http {
71 method: Option<&'a str>,
72 authority: Option<&'a str>,
73 path: Option<&'a str>,
74 status: Option<u16>,
75 reason: Option<&'a str>,
76 },
77 Tcp,
78}
79
80#[derive(Debug)]
82pub struct CachedTags {
83 pub tags: BTreeMap<String, String>,
84 pub concatenated: String,
85}
86
87impl CachedTags {
88 pub fn new(tags: BTreeMap<String, String>) -> Self {
89 let concatenated = tags
90 .iter()
91 .map(|(k, v)| format!("{k}={v}"))
92 .collect::<Vec<_>>()
93 .join(", ");
94 Self { tags, concatenated }
95 }
96}
97
98#[derive(Debug)]
99pub struct FullTags<'a> {
100 pub concatenated: Option<&'a str>,
101 pub user_agent: Option<&'a str>,
102}
103
104#[derive(Default)]
105pub struct OpenTelemetry {
106 pub trace_id: [u8; 32],
107 pub span_id: [u8; 16],
108 pub parent_span_id: Option<[u8; 16]>,
109}
110
111impl fmt::Debug for OpenTelemetry {
112 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
113 let trace_id = unsafe { std::str::from_utf8_unchecked(&self.trace_id) };
114 let span_id = unsafe { std::str::from_utf8_unchecked(&self.span_id) };
115 let parent_span_id = self
116 .parent_span_id
117 .as_ref()
118 .map(|id| unsafe { std::str::from_utf8_unchecked(id) })
119 .unwrap_or("-");
120 write!(f, "{trace_id} {span_id} {parent_span_id}")
121 }
122}
123
124pub struct RequestRecord<'a> {
127 pub message: Option<&'a str>,
128 pub context: LogContext<'a>,
129 pub session_address: Option<SocketAddr>,
130 pub backend_address: Option<SocketAddr>,
131 pub protocol: &'a str,
132 pub endpoint: EndpointRecord<'a>,
133 pub tags: Option<&'a CachedTags>,
134 pub client_rtt: Option<Duration>,
135 pub server_rtt: Option<Duration>,
136 pub user_agent: Option<&'a str>,
137 pub service_time: Duration,
138 pub response_time: Option<Duration>,
140 pub request_time: Duration,
142 pub bytes_in: usize,
143 pub bytes_out: usize,
144 pub otel: Option<&'a OpenTelemetry>,
145
146 pub pid: i32,
148 pub tag: &'a str,
149 pub level: LogLevel,
150 pub now: Rfc3339Time,
151 pub precise_time: i128,
152}
153
154impl RequestRecord<'_> {
155 pub fn full_tags(&self) -> FullTags<'_> {
156 FullTags {
157 concatenated: self.tags.as_ref().map(|t| t.concatenated.as_str()),
158 user_agent: self.user_agent,
159 }
160 }
161
162 pub fn into_binary_access_log(self) -> ManuallyDrop<ProtobufAccessLog> {
166 unsafe {
167 let endpoint = match self.endpoint {
168 EndpointRecord::Http {
169 method,
170 authority,
171 path,
172 status,
173 reason,
174 } => protobuf_endpoint::Inner::Http(HttpEndpoint {
175 method: method.duplicate(),
176 authority: authority.duplicate(),
177 path: path.duplicate(),
178 status: status.map(|s| s as u32),
179 reason: reason.duplicate(),
180 }),
181 EndpointRecord::Tcp => protobuf_endpoint::Inner::Tcp(TcpEndpoint {}),
182 };
183
184 ManuallyDrop::new(ProtobufAccessLog {
185 backend_address: self.backend_address.map(Into::into),
186 backend_id: self.context.backend_id.duplicate(),
187 bytes_in: self.bytes_in as u64,
188 bytes_out: self.bytes_out as u64,
189 client_rtt: self.client_rtt.map(|t| t.as_micros() as u64),
190 cluster_id: self.context.cluster_id.duplicate(),
191 endpoint: ProtobufEndpoint {
192 inner: Some(endpoint),
193 },
194 message: self.message.duplicate(),
195 protocol: self.protocol.duplicate(),
196 request_id: self.context.request_id.into(),
197 response_time: self.response_time.map(|t| t.as_micros() as u64),
198 server_rtt: self.server_rtt.map(|t| t.as_micros() as u64),
199 service_time: self.service_time.as_micros() as u64,
200 session_address: self.session_address.map(Into::into),
201 tags: self
202 .tags
203 .map(|tags| tags.tags.duplicate())
204 .unwrap_or_default(),
205 user_agent: self.user_agent.duplicate(),
206 tag: self.tag.duplicate(),
207 time: self.precise_time.into(),
208 request_time: Some(self.request_time.as_micros() as u64),
209 otel: self.otel.map(|otel| ProtobufOpenTelemetry {
210 trace_id: std::str::from_utf8_unchecked(&otel.trace_id).duplicate(),
211 span_id: std::str::from_utf8_unchecked(&otel.span_id).duplicate(),
212 parent_span_id: otel
213 .parent_span_id
214 .as_ref()
215 .map(|id| std::str::from_utf8_unchecked(id).duplicate()),
216 }),
217 })
218 }
219 }
220}