sozu_command_lib/logging/
access_logs.rs

1use std::{collections::BTreeMap, mem::ManuallyDrop, net::SocketAddr, time::Duration};
2
3use rusty_ulid::Ulid;
4
5use crate::{
6    logging::{LogLevel, Rfc3339Time},
7    proto::command::{
8        protobuf_endpoint, HttpEndpoint, ProtobufAccessLog, ProtobufEndpoint, TcpEndpoint,
9    },
10};
11
12/// This uses unsafe to creates a "fake" owner of the underlying data.
13/// Beware that for the compiler it is as legitimate as the original owner.
14/// So you have to elide one of them (with std::mem::forget or ManuallyDrop)
15/// before it is drop to avoid a double free.
16///
17/// This trait works on &T and Option<&T> types
18///
19/// After performance review, it seems not any more efficient than calling `clone()`,
20/// probably because the cache of malloc is so well optimized these days.
21trait DuplicateOwnership {
22    type Target;
23    /// Don't forget to use std::mem::forget or ManuallyDrop over one of your owners
24    unsafe fn duplicate(self) -> Self::Target;
25}
26
27impl<T> DuplicateOwnership for &T {
28    type Target = T;
29    unsafe fn duplicate(self) -> T {
30        std::ptr::read(self as *const T)
31    }
32}
33impl<'a, T> DuplicateOwnership for Option<&'a T>
34where
35    T: ?Sized,
36    &'a T: DuplicateOwnership,
37{
38    type Target = Option<<&'a T as DuplicateOwnership>::Target>;
39    unsafe fn duplicate(self) -> Self::Target {
40        self.map(|t| t.duplicate())
41    }
42}
43impl DuplicateOwnership for &str {
44    type Target = String;
45    unsafe fn duplicate(self) -> Self::Target {
46        String::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len())
47    }
48}
49impl<T> DuplicateOwnership for &[T] {
50    type Target = Vec<T>;
51    unsafe fn duplicate(self) -> Self::Target {
52        Vec::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len())
53    }
54}
55
56pub struct LogMessage<'a>(pub Option<&'a str>);
57pub struct LogDuration(pub Option<Duration>);
58
59#[derive(Debug)]
60pub struct LogContext<'a> {
61    pub request_id: Ulid,
62    pub cluster_id: Option<&'a str>,
63    pub backend_id: Option<&'a str>,
64}
65
66pub enum EndpointRecord<'a> {
67    Http {
68        method: Option<&'a str>,
69        authority: Option<&'a str>,
70        path: Option<&'a str>,
71        status: Option<u16>,
72        reason: Option<&'a str>,
73    },
74    Tcp,
75}
76
77/// used to aggregate tags in a session
78#[derive(Debug)]
79pub struct CachedTags {
80    pub tags: BTreeMap<String, String>,
81    pub concatenated: String,
82}
83
84impl CachedTags {
85    pub fn new(tags: BTreeMap<String, String>) -> Self {
86        let concatenated = tags
87            .iter()
88            .map(|(k, v)| format!("{k}={v}"))
89            .collect::<Vec<_>>()
90            .join(", ");
91        Self { tags, concatenated }
92    }
93}
94
95#[derive(Debug)]
96pub struct FullTags<'a> {
97    pub concatenated: Option<&'a str>,
98    pub user_agent: Option<&'a str>,
99}
100
101/// Intermediate representation of an access log agnostic of the final format.
102/// Every field is a reference to avoid capturing ownership (as a logger should).
103pub struct RequestRecord<'a> {
104    pub message: Option<&'a str>,
105    pub context: LogContext<'a>,
106    pub session_address: Option<SocketAddr>,
107    pub backend_address: Option<SocketAddr>,
108    pub protocol: &'a str,
109    pub endpoint: EndpointRecord<'a>,
110    pub tags: Option<&'a CachedTags>,
111    pub client_rtt: Option<Duration>,
112    pub server_rtt: Option<Duration>,
113    pub user_agent: Option<&'a str>,
114    pub service_time: Duration,
115    /// time from connecting to the backend until the end of the response
116    pub response_time: Option<Duration>,
117    /// time between first byte of the request and last byte of the response
118    pub request_time: Duration,
119    pub bytes_in: usize,
120    pub bytes_out: usize,
121
122    // added by the logger itself
123    pub pid: i32,
124    pub tag: &'a str,
125    pub level: LogLevel,
126    pub now: Rfc3339Time,
127    pub precise_time: i128,
128}
129
130impl RequestRecord<'_> {
131    pub fn full_tags(&self) -> FullTags {
132        FullTags {
133            concatenated: self.tags.as_ref().map(|t| t.concatenated.as_str()),
134            user_agent: self.user_agent,
135        }
136    }
137
138    /// Converts the RequestRecord in its protobuf representation.
139    /// Prost needs ownership over all the fields but we don't want to take it from the user
140    /// or clone them, so we use the unsafe DuplicateOwnership.
141    pub fn into_binary_access_log(self) -> ManuallyDrop<ProtobufAccessLog> {
142        unsafe {
143            let endpoint = match self.endpoint {
144                EndpointRecord::Http {
145                    method,
146                    authority,
147                    path,
148                    status,
149                    reason,
150                } => protobuf_endpoint::Inner::Http(HttpEndpoint {
151                    method: method.duplicate(),
152                    authority: authority.duplicate(),
153                    path: path.duplicate(),
154                    status: status.map(|s| s as u32),
155                    reason: reason.duplicate(),
156                }),
157                EndpointRecord::Tcp => protobuf_endpoint::Inner::Tcp(TcpEndpoint {}),
158            };
159
160            ManuallyDrop::new(ProtobufAccessLog {
161                backend_address: self.backend_address.map(Into::into),
162                backend_id: self.context.backend_id.duplicate(),
163                bytes_in: self.bytes_in as u64,
164                bytes_out: self.bytes_out as u64,
165                client_rtt: self.client_rtt.map(|t| t.as_micros() as u64),
166                cluster_id: self.context.cluster_id.duplicate(),
167                endpoint: ProtobufEndpoint {
168                    inner: Some(endpoint),
169                },
170                message: self.message.duplicate(),
171                protocol: self.protocol.duplicate(),
172                request_id: self.context.request_id.into(),
173                response_time: self.response_time.map(|t| t.as_micros() as u64),
174                server_rtt: self.server_rtt.map(|t| t.as_micros() as u64),
175                service_time: self.service_time.as_micros() as u64,
176                session_address: self.session_address.map(Into::into),
177                tags: self
178                    .tags
179                    .map(|tags| tags.tags.duplicate())
180                    .unwrap_or_default(),
181                user_agent: self.user_agent.duplicate(),
182                tag: self.tag.duplicate(),
183                time: self.precise_time.into(),
184                request_time: Some(self.request_time.as_micros() as u64),
185            })
186        }
187    }
188}