sozu_lib/protocol/kawa_h1/
editor.rs1use std::{
2 net::{IpAddr, SocketAddr},
3 str::{from_utf8, from_utf8_unchecked},
4};
5
6use rusty_ulid::Ulid;
7use sozu_command_lib::logging::LogContext;
8
9use crate::{
10 Protocol,
11 pool::Checkout,
12 protocol::http::{GenericHttpStream, Method, parser::compare_no_case},
13};
14
15#[cfg(feature = "opentelemetry")]
16fn parse_traceparent(val: &kawa::Store, buf: &[u8]) -> Option<([u8; 32], [u8; 16])> {
17 let val = val.data(buf);
18 let (version, val) = parse_hex::<2>(val)?;
19 if version.as_slice() != b"00" {
20 return None;
21 }
22 let val = skip_separator(val)?;
23 let (trace_id, val) = parse_hex::<32>(val)?;
24 let val = skip_separator(val)?;
25 let (parent_id, val) = parse_hex::<16>(val)?;
26 let val = skip_separator(val)?;
27 let (_, val) = parse_hex::<2>(val)?;
28 val.is_empty().then_some((trace_id, parent_id))
29}
30
31#[cfg(feature = "opentelemetry")]
32fn parse_hex<const N: usize>(buf: &[u8]) -> Option<([u8; N], &[u8])> {
33 let val: [u8; N] = buf.get(..N)?.try_into().unwrap();
34 val.iter()
35 .all(|c| c.is_ascii_hexdigit())
36 .then_some((val, &buf[N..]))
37}
38
39#[cfg(feature = "opentelemetry")]
40fn skip_separator(buf: &[u8]) -> Option<&[u8]> {
41 buf.first().filter(|b| **b == b'-').map(|_| &buf[1..])
42}
43
44#[cfg(feature = "opentelemetry")]
45fn random_id<const N: usize>() -> [u8; N] {
46 use rand::Rng;
47 const CHARSET: &[u8] = b"0123456789abcdef";
48 let mut rng = rand::thread_rng();
49 let mut buf = [0; N];
50 buf.fill_with(|| {
51 let n = rng.gen_range(0..CHARSET.len());
52 CHARSET[n]
53 });
54 buf
55}
56
57#[cfg(feature = "opentelemetry")]
58fn build_traceparent(trace_id: &[u8; 32], parent_id: &[u8; 16]) -> [u8; 55] {
59 let mut buf = [0; 55];
60 buf[..3].copy_from_slice(b"00-");
61 buf[3..35].copy_from_slice(trace_id);
62 buf[35] = b'-';
63 buf[36..52].copy_from_slice(parent_id);
64 buf[52..55].copy_from_slice(b"-01");
65 buf
66}
67
68#[derive(Debug)]
70pub struct HttpContext {
71 pub keep_alive_backend: bool,
74 pub keep_alive_frontend: bool,
76 pub sticky_session_found: Option<String>,
78 pub method: Option<Method>,
81 pub authority: Option<String>,
83 pub path: Option<String>,
85 pub status: Option<u16>,
87 pub reason: Option<String>,
89 pub user_agent: Option<String>,
91
92 #[cfg(feature = "opentelemetry")]
93 pub otel: Option<sozu_command::logging::OpenTelemetry>,
94
95 pub closing: bool,
98 pub id: Ulid,
100 pub backend_id: Option<String>,
101 pub cluster_id: Option<String>,
102 pub protocol: Protocol,
104 pub public_address: SocketAddr,
106 pub session_address: Option<SocketAddr>,
108 pub sticky_name: String,
110 pub sticky_session: Option<String>,
113}
114
115impl kawa::h1::ParserCallbacks<Checkout> for HttpContext {
116 fn on_headers(&mut self, stream: &mut GenericHttpStream) {
117 match stream.kind {
118 kawa::Kind::Request => self.on_request_headers(stream),
119 kawa::Kind::Response => self.on_response_headers(stream),
120 }
121 }
122}
123
124impl HttpContext {
125 pub fn new(
127 request_id: Ulid,
128 protocol: Protocol,
129 public_address: SocketAddr,
130 session_address: Option<SocketAddr>,
131 sticky_name: String,
132 ) -> Self {
133 Self {
134 id: request_id,
135 backend_id: None,
136 cluster_id: None,
137
138 closing: false,
139 keep_alive_backend: true,
140 keep_alive_frontend: true,
141 protocol,
142 public_address,
143 session_address,
144 sticky_name,
145 sticky_session: None,
146 sticky_session_found: None,
147
148 method: None,
149 authority: None,
150 path: None,
151 status: None,
152 reason: None,
153 user_agent: None,
154
155 #[cfg(feature = "opentelemetry")]
156 otel: Default::default(),
157 }
158 }
159
160 fn on_request_headers(&mut self, request: &mut GenericHttpStream) {
171 let buf = request.storage.mut_buffer();
172
173 if let kawa::StatusLine::Request {
175 method,
176 authority,
177 path,
178 ..
179 } = &request.detached.status_line
180 {
181 self.method = method.data_opt(buf).map(Method::new);
182 self.authority = authority
183 .data_opt(buf)
184 .and_then(|data| from_utf8(data).ok())
185 .map(ToOwned::to_owned);
186 self.path = path
187 .data_opt(buf)
188 .and_then(|data| from_utf8(data).ok())
189 .map(ToOwned::to_owned);
190 }
191
192 let public_ip = self.public_address.ip();
197 let public_port = self.public_address.port();
198 let proto = match self.protocol {
199 Protocol::HTTP => "http",
200 Protocol::HTTPS => "https",
201 _ => unreachable!(),
202 };
203
204 for cookie in &mut request.detached.jar {
207 let key = cookie.key.data(buf);
208 if key == self.sticky_name.as_bytes() {
209 let val = cookie.val.data(buf);
210 self.sticky_session_found = from_utf8(val).ok().map(|val| val.to_string());
211 cookie.elide();
212 }
213 }
214
215 let mut x_for = None;
224 let mut forwarded = None;
225 let mut has_x_port = false;
226 let mut has_x_proto = false;
227 let mut has_connection = false;
228 #[cfg(feature = "opentelemetry")]
229 let mut traceparent: Option<&mut kawa::Pair> = None;
230 #[cfg(feature = "opentelemetry")]
231 let mut tracestate: Option<&mut kawa::Pair> = None;
232 for block in &mut request.blocks {
233 match block {
234 kawa::Block::Header(header) if !header.is_elided() => {
235 let key = header.key.data(buf);
236 if compare_no_case(key, b"connection") {
237 has_connection = true;
238 if self.closing {
239 header.val = kawa::Store::Static(b"close");
240 } else {
241 let val = header.val.data(buf);
242 self.keep_alive_frontend &= !compare_no_case(val, b"close");
243 }
244 } else if compare_no_case(key, b"X-Forwarded-Proto") {
245 has_x_proto = true;
246 incr!("http.trusting.x_proto");
248 let val = header.val.data(buf);
249 if !compare_no_case(val, proto.as_bytes()) {
250 incr!("http.trusting.x_proto.diff");
251 debug!(
252 "Trusting X-Forwarded-Proto for {:?} even though {:?} != {}",
253 self.authority, val, proto
254 );
255 }
256 } else if compare_no_case(key, b"X-Forwarded-Port") {
257 has_x_port = true;
258 incr!("http.trusting.x_port");
260 let val = header.val.data(buf);
261 let expected = public_port.to_string();
262 if !compare_no_case(val, expected.as_bytes()) {
263 incr!("http.trusting.x_port.diff");
264 debug!(
265 "Trusting X-Forwarded-Port for {:?} even though {:?} != {}",
266 self.authority, val, expected
267 );
268 }
269 } else if compare_no_case(key, b"X-Forwarded-For") {
270 x_for = Some(header);
271 } else if compare_no_case(key, b"Forwarded") {
272 forwarded = Some(header);
273 } else if compare_no_case(key, b"User-Agent") {
274 self.user_agent = header
275 .val
276 .data_opt(buf)
277 .and_then(|data| from_utf8(data).ok())
278 .map(ToOwned::to_owned);
279 } else {
280 #[cfg(feature = "opentelemetry")]
281 if compare_no_case(key, b"traceparent") {
282 if let Some(hdr) = traceparent {
283 hdr.elide();
284 }
285 traceparent = Some(header);
286 } else if compare_no_case(key, b"tracestate") {
287 if let Some(hdr) = tracestate {
288 hdr.elide();
289 }
290 tracestate = Some(header);
291 }
292 }
293 }
294 _ => {}
295 }
296 }
297
298 #[cfg(feature = "opentelemetry")]
299 let (otel, has_traceparent) = {
300 let mut otel = sozu_command_lib::logging::OpenTelemetry::default();
301 let tp = traceparent
302 .as_ref()
303 .and_then(|hdr| parse_traceparent(&hdr.val, buf))
304 .map(|(trace_id, parent_id)| (trace_id, Some(parent_id)));
305 if let (None, Some(tracestate)) = (tp, tracestate) {
307 tracestate.elide();
308 }
309 let (trace_id, parent_id) = tp.unwrap_or_else(|| (random_id(), None));
310 otel.trace_id = trace_id;
311 otel.parent_span_id = parent_id;
312 otel.span_id = random_id();
313 if let Some(id) = &mut traceparent {
315 let new_val = build_traceparent(&otel.trace_id, &otel.span_id);
316 id.val.modify(buf, &new_val);
317 }
318 (otel, traceparent.is_some())
319 };
320
321 if let Some(peer_addr) = self.session_address {
325 let peer_ip = peer_addr.ip();
326 let peer_port = peer_addr.port();
327 let has_x_for = x_for.is_some();
328 let has_forwarded = forwarded.is_some();
329
330 if let Some(header) = x_for {
331 header.val = kawa::Store::from_string(format!("{}, {peer_ip}", unsafe {
332 from_utf8_unchecked(header.val.data(buf))
333 }));
334 }
335 if let Some(header) = &mut forwarded {
336 let value = unsafe { from_utf8_unchecked(header.val.data(buf)) };
337 let new_value = match public_ip {
338 IpAddr::V4(_) => {
339 format!(
340 "{value}, proto={proto};for=\"{peer_ip}:{peer_port}\";by={public_ip}"
341 )
342 }
343 IpAddr::V6(_) => {
344 format!(
345 "{value}, proto={proto};for=\"{peer_ip}:{peer_port}\";by=\"{public_ip}\""
346 )
347 }
348 };
349 header.val = kawa::Store::from_string(new_value);
350 }
351
352 if !has_x_for {
353 request.push_block(kawa::Block::Header(kawa::Pair {
354 key: kawa::Store::Static(b"X-Forwarded-For"),
355 val: kawa::Store::from_string(peer_ip.to_string()),
356 }));
357 }
358 if !has_forwarded {
359 let value = match public_ip {
360 IpAddr::V4(_) => {
361 format!("proto={proto};for=\"{peer_ip}:{peer_port}\";by={public_ip}")
362 }
363 IpAddr::V6(_) => {
364 format!("proto={proto};for=\"{peer_ip}:{peer_port}\";by=\"{public_ip}\"")
365 }
366 };
367 request.push_block(kawa::Block::Header(kawa::Pair {
368 key: kawa::Store::Static(b"Forwarded"),
369 val: kawa::Store::from_string(value),
370 }));
371 }
372 }
373
374 #[cfg(feature = "opentelemetry")]
375 {
376 if !has_traceparent {
377 let val = build_traceparent(&otel.trace_id, &otel.span_id);
378 request.push_block(kawa::Block::Header(kawa::Pair {
379 key: kawa::Store::Static(b"traceparent"),
380 val: kawa::Store::from_slice(&val),
381 }));
382 }
383 self.otel = Some(otel);
384 }
385
386 if !has_x_port {
387 request.push_block(kawa::Block::Header(kawa::Pair {
388 key: kawa::Store::Static(b"X-Forwarded-Port"),
389 val: kawa::Store::from_string(public_port.to_string()),
390 }));
391 }
392 if !has_x_proto {
393 request.push_block(kawa::Block::Header(kawa::Pair {
394 key: kawa::Store::Static(b"X-Forwarded-Proto"),
395 val: kawa::Store::Static(proto.as_bytes()),
396 }));
397 }
398 if !has_connection && self.closing {
400 request.push_block(kawa::Block::Header(kawa::Pair {
401 key: kawa::Store::Static(b"Connection"),
402 val: kawa::Store::Static(b"close"),
403 }));
404 }
405 request.push_block(kawa::Block::Header(kawa::Pair {
407 key: kawa::Store::Static(b"Sozu-Id"),
408 val: kawa::Store::from_string(self.id.to_string()),
409 }));
410 }
411
412 fn on_response_headers(&mut self, response: &mut GenericHttpStream) {
420 let buf = &mut response.storage.mut_buffer();
421
422 if let kawa::StatusLine::Response { code, reason, .. } = &response.detached.status_line {
424 self.status = Some(*code);
425 self.reason = reason
426 .data_opt(buf)
427 .and_then(|data| from_utf8(data).ok())
428 .map(ToOwned::to_owned);
429 }
430
431 if self.method == Some(Method::Head) {
432 response.parsing_phase = kawa::ParsingPhase::Terminated;
433 }
434
435 for block in &mut response.blocks {
439 match block {
440 kawa::Block::Header(header) if !header.is_elided() => {
441 let key = header.key.data(buf);
442 if compare_no_case(key, b"connection") {
443 if self.closing {
444 header.val = kawa::Store::Static(b"close");
445 } else {
446 let val = header.val.data(buf);
447 self.keep_alive_backend &= !compare_no_case(val, b"close");
448 }
449 }
450 }
451 _ => {}
452 }
453 }
454
455 if let Some(sticky_session) = &self.sticky_session {
458 if self.sticky_session != self.sticky_session_found {
459 response.push_block(kawa::Block::Header(kawa::Pair {
460 key: kawa::Store::Static(b"Set-Cookie"),
461 val: kawa::Store::from_string(format!(
462 "{}={}; Path=/",
463 self.sticky_name, sticky_session
464 )),
465 }));
466 }
467 }
468
469 response.push_block(kawa::Block::Header(kawa::Pair {
471 key: kawa::Store::Static(b"Sozu-Id"),
472 val: kawa::Store::from_string(self.id.to_string()),
473 }));
474 }
475
476 pub fn reset(&mut self) {
477 self.keep_alive_backend = true;
478 self.keep_alive_frontend = true;
479 self.sticky_session_found = None;
480 self.method = None;
481 self.authority = None;
482 self.path = None;
483 self.status = None;
484 self.reason = None;
485 self.user_agent = None;
486 }
487
488 pub fn log_context(&self) -> LogContext<'_> {
489 LogContext {
490 request_id: self.id,
491 cluster_id: self.cluster_id.as_deref(),
492 backend_id: self.backend_id.as_deref(),
493 }
494 }
495}