1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
use std::{collections::BTreeMap, mem::ManuallyDrop, net::SocketAddr};

use rusty_ulid::Ulid;
use time::Duration;

use crate::{
    logging::{LogLevel, Rfc3339Time},
    proto::command::{
        protobuf_endpoint, HttpEndpoint, ProtobufAccessLog, ProtobufEndpoint, TcpEndpoint,
    },
};

/// This uses unsafe to creates a "fake" owner of the underlying data.
/// Beware that for the compiler it is as legitimate as the original owner.
/// So you have to elide one of them (with std::mem::forget or ManuallyDrop)
/// before it is drop to avoid a double free.
///
/// This trait works on &T and Option<&T> types
///
/// After performance review, it seems not any more efficient than calling `clone()`,
/// probably because the cache of malloc is so well optimized these days.
trait DuplicateOwnership {
    type Target;
    /// Don't forget to use std::mem::forget or ManuallyDrop over one of your owners
    unsafe fn duplicate(self) -> Self::Target;
}

impl<T> DuplicateOwnership for &T {
    type Target = T;
    unsafe fn duplicate(self) -> T {
        std::ptr::read(self as *const T)
    }
}
impl<'a, T> DuplicateOwnership for Option<&'a T>
where
    T: ?Sized,
    &'a T: DuplicateOwnership,
{
    type Target = Option<<&'a T as DuplicateOwnership>::Target>;
    unsafe fn duplicate(self) -> Self::Target {
        self.map(|t| t.duplicate())
    }
}
impl DuplicateOwnership for &str {
    type Target = String;
    unsafe fn duplicate(self) -> Self::Target {
        String::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len())
    }
}
impl<T> DuplicateOwnership for &[T] {
    type Target = Vec<T>;
    unsafe fn duplicate(self) -> Self::Target {
        Vec::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len())
    }
}

pub struct LogMessage<'a>(pub Option<&'a str>);
pub struct LogDuration(pub Option<Duration>);

#[derive(Debug)]
pub struct LogContext<'a> {
    pub request_id: Ulid,
    pub cluster_id: Option<&'a str>,
    pub backend_id: Option<&'a str>,
}

pub enum EndpointRecord<'a> {
    Http {
        method: Option<&'a str>,
        authority: Option<&'a str>,
        path: Option<&'a str>,
        status: Option<u16>,
        reason: Option<&'a str>,
    },
    Tcp,
}

/// used to aggregate tags in a session
#[derive(Debug)]
pub struct CachedTags {
    pub tags: BTreeMap<String, String>,
    pub concatenated: String,
}

impl CachedTags {
    pub fn new(tags: BTreeMap<String, String>) -> Self {
        let concatenated = tags
            .iter()
            .map(|(k, v)| format!("{k}={v}"))
            .collect::<Vec<_>>()
            .join(", ");
        Self { tags, concatenated }
    }
}

#[derive(Debug)]
pub struct FullTags<'a> {
    pub concatenated: Option<&'a str>,
    pub user_agent: Option<&'a str>,
}

/// Intermediate representation of an access log agnostic of the final format.
/// Every field is a reference to avoid capturing ownership (as a logger should).
pub struct RequestRecord<'a> {
    pub message: Option<&'a str>,
    pub context: LogContext<'a>,
    pub session_address: Option<SocketAddr>,
    pub backend_address: Option<SocketAddr>,
    pub protocol: &'a str,
    pub endpoint: EndpointRecord<'a>,
    pub tags: Option<&'a CachedTags>,
    pub client_rtt: Option<Duration>,
    pub server_rtt: Option<Duration>,
    pub user_agent: Option<&'a str>,
    pub service_time: Duration,
    pub response_time: Duration,
    pub bytes_in: usize,
    pub bytes_out: usize,

    // added by the logger itself
    pub pid: i32,
    pub tag: &'a str,
    pub level: LogLevel,
    pub now: Rfc3339Time,
    pub precise_time: i128,
}

impl RequestRecord<'_> {
    pub fn full_tags(&self) -> FullTags {
        FullTags {
            concatenated: self.tags.as_ref().map(|t| t.concatenated.as_str()),
            user_agent: self.user_agent,
        }
    }

    /// Converts the RequestRecord in its protobuf representation.
    /// Prost needs ownership over all the fields but we don't want to take it from the user
    /// or clone them, so we use the unsafe DuplicateOwnership.
    pub fn into_binary_access_log(self) -> ManuallyDrop<ProtobufAccessLog> {
        unsafe {
            let endpoint = match self.endpoint {
                EndpointRecord::Http {
                    method,
                    authority,
                    path,
                    status,
                    reason,
                } => protobuf_endpoint::Inner::Http(HttpEndpoint {
                    method: method.duplicate(),
                    authority: authority.duplicate(),
                    path: path.duplicate(),
                    status: status.map(|s| s as u32),
                    reason: reason.duplicate(),
                }),
                EndpointRecord::Tcp => protobuf_endpoint::Inner::Tcp(TcpEndpoint {}),
            };

            ManuallyDrop::new(ProtobufAccessLog {
                backend_address: self.backend_address.map(Into::into),
                backend_id: self.context.backend_id.duplicate(),
                bytes_in: self.bytes_in as u64,
                bytes_out: self.bytes_out as u64,
                client_rtt: self.client_rtt.map(|t| t.whole_microseconds() as u64),
                cluster_id: self.context.cluster_id.duplicate(),
                endpoint: ProtobufEndpoint {
                    inner: Some(endpoint),
                },
                message: self.message.duplicate(),
                protocol: self.protocol.duplicate(),
                request_id: self.context.request_id.into(),
                response_time: self.response_time.whole_microseconds() as u64,
                server_rtt: self.server_rtt.map(|t| t.whole_microseconds() as u64),
                service_time: self.service_time.whole_microseconds() as u64,
                session_address: self.session_address.map(Into::into),
                tags: self
                    .tags
                    .map(|tags| tags.tags.duplicate())
                    .unwrap_or_default(),
                user_agent: self.user_agent.duplicate(),
                tag: self.tag.duplicate(),
                time: self.precise_time.into(),
            })
        }
    }
}