sozu_lib/protocol/kawa_h1/
editor.rs

1use std::{
2    net::{IpAddr, SocketAddr},
3    str::{from_utf8, from_utf8_unchecked},
4};
5
6use rusty_ulid::Ulid;
7use sozu_command_lib::logging::LogContext;
8
9use crate::{
10    Protocol,
11    pool::Checkout,
12    protocol::http::{GenericHttpStream, Method, parser::compare_no_case},
13};
14
15#[cfg(feature = "opentelemetry")]
16fn parse_traceparent(val: &kawa::Store, buf: &[u8]) -> Option<([u8; 32], [u8; 16])> {
17    let val = val.data(buf);
18    let (version, val) = parse_hex::<2>(val)?;
19    if version.as_slice() != b"00" {
20        return None;
21    }
22    let val = skip_separator(val)?;
23    let (trace_id, val) = parse_hex::<32>(val)?;
24    let val = skip_separator(val)?;
25    let (parent_id, val) = parse_hex::<16>(val)?;
26    let val = skip_separator(val)?;
27    let (_, val) = parse_hex::<2>(val)?;
28    val.is_empty().then_some((trace_id, parent_id))
29}
30
31#[cfg(feature = "opentelemetry")]
32fn parse_hex<const N: usize>(buf: &[u8]) -> Option<([u8; N], &[u8])> {
33    let val: [u8; N] = buf.get(..N)?.try_into().unwrap();
34    val.iter()
35        .all(|c| c.is_ascii_hexdigit())
36        .then_some((val, &buf[N..]))
37}
38
39#[cfg(feature = "opentelemetry")]
40fn skip_separator(buf: &[u8]) -> Option<&[u8]> {
41    buf.first().filter(|b| **b == b'-').map(|_| &buf[1..])
42}
43
44#[cfg(feature = "opentelemetry")]
45fn random_id<const N: usize>() -> [u8; N] {
46    use rand::Rng;
47    const CHARSET: &[u8] = b"0123456789abcdef";
48    let mut rng = rand::thread_rng();
49    let mut buf = [0; N];
50    buf.fill_with(|| {
51        let n = rng.gen_range(0..CHARSET.len());
52        CHARSET[n]
53    });
54    buf
55}
56
57#[cfg(feature = "opentelemetry")]
58fn build_traceparent(trace_id: &[u8; 32], parent_id: &[u8; 16]) -> [u8; 55] {
59    let mut buf = [0; 55];
60    buf[..3].copy_from_slice(b"00-");
61    buf[3..35].copy_from_slice(trace_id);
62    buf[35] = b'-';
63    buf[36..52].copy_from_slice(parent_id);
64    buf[52..55].copy_from_slice(b"-01");
65    buf
66}
67
68/// This is the container used to store and use information about the session from within a Kawa parser callback
69#[derive(Debug)]
70pub struct HttpContext {
71    // ========== Write only
72    /// set to false if Kawa finds a "Connection" header with a "close" value in the response
73    pub keep_alive_backend: bool,
74    /// set to false if Kawa finds a "Connection" header with a "close" value in the request
75    pub keep_alive_frontend: bool,
76    /// the value of the sticky session cookie in the request
77    pub sticky_session_found: Option<String>,
78    // ---------- Status Line
79    /// the value of the method in the request line
80    pub method: Option<Method>,
81    /// the value of the authority of the request (in the request line of "Host" header)
82    pub authority: Option<String>,
83    /// the value of the path in the request line
84    pub path: Option<String>,
85    /// the value of the status code in the response line
86    pub status: Option<u16>,
87    /// the value of the reason in the response line
88    pub reason: Option<String>,
89    // ---------- Additional optional data
90    pub user_agent: Option<String>,
91
92    #[cfg(feature = "opentelemetry")]
93    pub otel: Option<sozu_command::logging::OpenTelemetry>,
94
95    // ========== Read only
96    /// signals wether Kawa should write a "Connection" header with a "close" value (request and response)
97    pub closing: bool,
98    /// the value of the custom header, named "Sozu-Id", that Kawa should write (request and response)
99    pub id: Ulid,
100    pub backend_id: Option<String>,
101    pub cluster_id: Option<String>,
102    /// the value of the protocol Kawa should write in the Forwarded headers of the request
103    pub protocol: Protocol,
104    /// the value of the public address Kawa should write in the Forwarded headers of the request
105    pub public_address: SocketAddr,
106    /// the value of the session address Kawa should write in the Forwarded headers of the request
107    pub session_address: Option<SocketAddr>,
108    /// the name of the cookie Kawa should read from the request to get the sticky session
109    pub sticky_name: String,
110    /// the sticky session that should be used
111    /// used to create a "Set-Cookie" header in the response in case it differs from sticky_session_found
112    pub sticky_session: Option<String>,
113}
114
115impl kawa::h1::ParserCallbacks<Checkout> for HttpContext {
116    fn on_headers(&mut self, stream: &mut GenericHttpStream) {
117        match stream.kind {
118            kawa::Kind::Request => self.on_request_headers(stream),
119            kawa::Kind::Response => self.on_response_headers(stream),
120        }
121    }
122}
123
124impl HttpContext {
125    /// Creates a new instance
126    pub fn new(
127        request_id: Ulid,
128        protocol: Protocol,
129        public_address: SocketAddr,
130        session_address: Option<SocketAddr>,
131        sticky_name: String,
132    ) -> Self {
133        Self {
134            id: request_id,
135            backend_id: None,
136            cluster_id: None,
137
138            closing: false,
139            keep_alive_backend: true,
140            keep_alive_frontend: true,
141            protocol,
142            public_address,
143            session_address,
144            sticky_name,
145            sticky_session: None,
146            sticky_session_found: None,
147
148            method: None,
149            authority: None,
150            path: None,
151            status: None,
152            reason: None,
153            user_agent: None,
154
155            #[cfg(feature = "opentelemetry")]
156            otel: Default::default(),
157        }
158    }
159
160    /// Callback for request:
161    ///
162    /// - edit headers (connection, forwarded, sticky cookie, sozu-id)
163    /// - save information:
164    ///   - method
165    ///   - authority
166    ///   - path
167    ///   - front keep-alive
168    ///   - sticky cookie
169    ///   - user-agent
170    fn on_request_headers(&mut self, request: &mut GenericHttpStream) {
171        let buf = request.storage.mut_buffer();
172
173        // Captures the request line
174        if let kawa::StatusLine::Request {
175            method,
176            authority,
177            path,
178            ..
179        } = &request.detached.status_line
180        {
181            self.method = method.data_opt(buf).map(Method::new);
182            self.authority = authority
183                .data_opt(buf)
184                .and_then(|data| from_utf8(data).ok())
185                .map(ToOwned::to_owned);
186            self.path = path
187                .data_opt(buf)
188                .and_then(|data| from_utf8(data).ok())
189                .map(ToOwned::to_owned);
190        }
191
192        // if self.method == Some(Method::Get) && request.body_size == kawa::BodySize::Empty {
193        //     request.parsing_phase = kawa::ParsingPhase::Terminated;
194        // }
195
196        let public_ip = self.public_address.ip();
197        let public_port = self.public_address.port();
198        let proto = match self.protocol {
199            Protocol::HTTP => "http",
200            Protocol::HTTPS => "https",
201            _ => unreachable!(),
202        };
203
204        // Find and remove the sticky_name cookie
205        // if found its value is stored in sticky_session_found
206        for cookie in &mut request.detached.jar {
207            let key = cookie.key.data(buf);
208            if key == self.sticky_name.as_bytes() {
209                let val = cookie.val.data(buf);
210                self.sticky_session_found = from_utf8(val).ok().map(|val| val.to_string());
211                cookie.elide();
212            }
213        }
214
215        // If found:
216        // - set Connection to "close" if closing is set
217        // - set keep_alive_frontend to false if Connection is "close"
218        // - update value of X-Forwarded-Proto
219        // - update value of X-Forwarded-Port
220        // - store X-Forwarded-For
221        // - store Forwarded
222        // - store User-Agent
223        let mut x_for = None;
224        let mut forwarded = None;
225        let mut has_x_port = false;
226        let mut has_x_proto = false;
227        let mut has_connection = false;
228        #[cfg(feature = "opentelemetry")]
229        let mut traceparent: Option<&mut kawa::Pair> = None;
230        #[cfg(feature = "opentelemetry")]
231        let mut tracestate: Option<&mut kawa::Pair> = None;
232        for block in &mut request.blocks {
233            match block {
234                kawa::Block::Header(header) if !header.is_elided() => {
235                    let key = header.key.data(buf);
236                    if compare_no_case(key, b"connection") {
237                        has_connection = true;
238                        if self.closing {
239                            header.val = kawa::Store::Static(b"close");
240                        } else {
241                            let val = header.val.data(buf);
242                            self.keep_alive_frontend &= !compare_no_case(val, b"close");
243                        }
244                    } else if compare_no_case(key, b"X-Forwarded-Proto") {
245                        has_x_proto = true;
246                        // header.val = kawa::Store::Static(proto.as_bytes());
247                        incr!("http.trusting.x_proto");
248                        let val = header.val.data(buf);
249                        if !compare_no_case(val, proto.as_bytes()) {
250                            incr!("http.trusting.x_proto.diff");
251                            debug!(
252                                "Trusting X-Forwarded-Proto for {:?} even though {:?} != {}",
253                                self.authority, val, proto
254                            );
255                        }
256                    } else if compare_no_case(key, b"X-Forwarded-Port") {
257                        has_x_port = true;
258                        // header.val = kawa::Store::from_string(public_port.to_string());
259                        incr!("http.trusting.x_port");
260                        let val = header.val.data(buf);
261                        let expected = public_port.to_string();
262                        if !compare_no_case(val, expected.as_bytes()) {
263                            incr!("http.trusting.x_port.diff");
264                            debug!(
265                                "Trusting X-Forwarded-Port for {:?} even though {:?} != {}",
266                                self.authority, val, expected
267                            );
268                        }
269                    } else if compare_no_case(key, b"X-Forwarded-For") {
270                        x_for = Some(header);
271                    } else if compare_no_case(key, b"Forwarded") {
272                        forwarded = Some(header);
273                    } else if compare_no_case(key, b"User-Agent") {
274                        self.user_agent = header
275                            .val
276                            .data_opt(buf)
277                            .and_then(|data| from_utf8(data).ok())
278                            .map(ToOwned::to_owned);
279                    } else {
280                        #[cfg(feature = "opentelemetry")]
281                        if compare_no_case(key, b"traceparent") {
282                            if let Some(hdr) = traceparent {
283                                hdr.elide();
284                            }
285                            traceparent = Some(header);
286                        } else if compare_no_case(key, b"tracestate") {
287                            if let Some(hdr) = tracestate {
288                                hdr.elide();
289                            }
290                            tracestate = Some(header);
291                        }
292                    }
293                }
294                _ => {}
295            }
296        }
297
298        #[cfg(feature = "opentelemetry")]
299        let (otel, has_traceparent) = {
300            let mut otel = sozu_command_lib::logging::OpenTelemetry::default();
301            let tp = traceparent
302                .as_ref()
303                .and_then(|hdr| parse_traceparent(&hdr.val, buf))
304                .map(|(trace_id, parent_id)| (trace_id, Some(parent_id)));
305            // Remove tracestate if no traceparent is present
306            if let (None, Some(tracestate)) = (tp, tracestate) {
307                tracestate.elide();
308            }
309            let (trace_id, parent_id) = tp.unwrap_or_else(|| (random_id(), None));
310            otel.trace_id = trace_id;
311            otel.parent_span_id = parent_id;
312            otel.span_id = random_id();
313            // Modify header if present
314            if let Some(id) = &mut traceparent {
315                let new_val = build_traceparent(&otel.trace_id, &otel.span_id);
316                id.val.modify(buf, &new_val);
317            }
318            (otel, traceparent.is_some())
319        };
320
321        // If session_address is set:
322        // - append its ip address to the list of "X-Forwarded-For" if it was found, creates it if not
323        // - append "proto=[PROTO];for=[PEER];by=[PUBLIC]" to the list of "Forwarded" if it was found, creates it if not
324        if let Some(peer_addr) = self.session_address {
325            let peer_ip = peer_addr.ip();
326            let peer_port = peer_addr.port();
327            let has_x_for = x_for.is_some();
328            let has_forwarded = forwarded.is_some();
329
330            if let Some(header) = x_for {
331                header.val = kawa::Store::from_string(format!("{}, {peer_ip}", unsafe {
332                    from_utf8_unchecked(header.val.data(buf))
333                }));
334            }
335            if let Some(header) = &mut forwarded {
336                let value = unsafe { from_utf8_unchecked(header.val.data(buf)) };
337                let new_value = match public_ip {
338                    IpAddr::V4(_) => {
339                        format!(
340                            "{value}, proto={proto};for=\"{peer_ip}:{peer_port}\";by={public_ip}"
341                        )
342                    }
343                    IpAddr::V6(_) => {
344                        format!(
345                            "{value}, proto={proto};for=\"{peer_ip}:{peer_port}\";by=\"{public_ip}\""
346                        )
347                    }
348                };
349                header.val = kawa::Store::from_string(new_value);
350            }
351
352            if !has_x_for {
353                request.push_block(kawa::Block::Header(kawa::Pair {
354                    key: kawa::Store::Static(b"X-Forwarded-For"),
355                    val: kawa::Store::from_string(peer_ip.to_string()),
356                }));
357            }
358            if !has_forwarded {
359                let value = match public_ip {
360                    IpAddr::V4(_) => {
361                        format!("proto={proto};for=\"{peer_ip}:{peer_port}\";by={public_ip}")
362                    }
363                    IpAddr::V6(_) => {
364                        format!("proto={proto};for=\"{peer_ip}:{peer_port}\";by=\"{public_ip}\"")
365                    }
366                };
367                request.push_block(kawa::Block::Header(kawa::Pair {
368                    key: kawa::Store::Static(b"Forwarded"),
369                    val: kawa::Store::from_string(value),
370                }));
371            }
372        }
373
374        #[cfg(feature = "opentelemetry")]
375        {
376            if !has_traceparent {
377                let val = build_traceparent(&otel.trace_id, &otel.span_id);
378                request.push_block(kawa::Block::Header(kawa::Pair {
379                    key: kawa::Store::Static(b"traceparent"),
380                    val: kawa::Store::from_slice(&val),
381                }));
382            }
383            self.otel = Some(otel);
384        }
385
386        if !has_x_port {
387            request.push_block(kawa::Block::Header(kawa::Pair {
388                key: kawa::Store::Static(b"X-Forwarded-Port"),
389                val: kawa::Store::from_string(public_port.to_string()),
390            }));
391        }
392        if !has_x_proto {
393            request.push_block(kawa::Block::Header(kawa::Pair {
394                key: kawa::Store::Static(b"X-Forwarded-Proto"),
395                val: kawa::Store::Static(proto.as_bytes()),
396            }));
397        }
398        // Create a "Connection" header in case it was not found and closing it set
399        if !has_connection && self.closing {
400            request.push_block(kawa::Block::Header(kawa::Pair {
401                key: kawa::Store::Static(b"Connection"),
402                val: kawa::Store::Static(b"close"),
403            }));
404        }
405        // Create a custom "Sozu-Id" header
406        request.push_block(kawa::Block::Header(kawa::Pair {
407            key: kawa::Store::Static(b"Sozu-Id"),
408            val: kawa::Store::from_string(self.id.to_string()),
409        }));
410    }
411
412    /// Callback for response:
413    ///
414    /// - edit headers (connection, set-cookie, sozu-id)
415    /// - save information:
416    ///   - status code
417    ///   - reason
418    ///   - back keep-alive
419    fn on_response_headers(&mut self, response: &mut GenericHttpStream) {
420        let buf = &mut response.storage.mut_buffer();
421
422        // Captures the response line
423        if let kawa::StatusLine::Response { code, reason, .. } = &response.detached.status_line {
424            self.status = Some(*code);
425            self.reason = reason
426                .data_opt(buf)
427                .and_then(|data| from_utf8(data).ok())
428                .map(ToOwned::to_owned);
429        }
430
431        if self.method == Some(Method::Head) {
432            response.parsing_phase = kawa::ParsingPhase::Terminated;
433        }
434
435        // If found:
436        // - set Connection to "close" if closing is set
437        // - set keep_alive_backend to false if Connection is "close"
438        for block in &mut response.blocks {
439            match block {
440                kawa::Block::Header(header) if !header.is_elided() => {
441                    let key = header.key.data(buf);
442                    if compare_no_case(key, b"connection") {
443                        if self.closing {
444                            header.val = kawa::Store::Static(b"close");
445                        } else {
446                            let val = header.val.data(buf);
447                            self.keep_alive_backend &= !compare_no_case(val, b"close");
448                        }
449                    }
450                }
451                _ => {}
452            }
453        }
454
455        // If the sticky_session is set and differs from the one found in the request
456        // create a "Set-Cookie" header to update the sticky_name value
457        if let Some(sticky_session) = &self.sticky_session {
458            if self.sticky_session != self.sticky_session_found {
459                response.push_block(kawa::Block::Header(kawa::Pair {
460                    key: kawa::Store::Static(b"Set-Cookie"),
461                    val: kawa::Store::from_string(format!(
462                        "{}={}; Path=/",
463                        self.sticky_name, sticky_session
464                    )),
465                }));
466            }
467        }
468
469        // Create a custom "Sozu-Id" header
470        response.push_block(kawa::Block::Header(kawa::Pair {
471            key: kawa::Store::Static(b"Sozu-Id"),
472            val: kawa::Store::from_string(self.id.to_string()),
473        }));
474    }
475
476    pub fn reset(&mut self) {
477        self.keep_alive_backend = true;
478        self.keep_alive_frontend = true;
479        self.sticky_session_found = None;
480        self.method = None;
481        self.authority = None;
482        self.path = None;
483        self.status = None;
484        self.reason = None;
485        self.user_agent = None;
486    }
487
488    pub fn log_context(&self) -> LogContext<'_> {
489        LogContext {
490            request_id: self.id,
491            cluster_id: self.cluster_id.as_deref(),
492            backend_id: self.backend_id.as_deref(),
493        }
494    }
495}