sozu_lib/protocol/kawa_h1/
editor.rs1use std::{
2 net::{IpAddr, SocketAddr},
3 str::{from_utf8, from_utf8_unchecked},
4};
5
6use rusty_ulid::Ulid;
7
8use crate::{
9 pool::Checkout,
10 protocol::http::{parser::compare_no_case, GenericHttpStream, Method},
11 Protocol,
12};
13
14use sozu_command_lib::logging::LogContext;
15
16#[derive(Debug)]
18pub struct HttpContext {
19 pub keep_alive_backend: bool,
22 pub keep_alive_frontend: bool,
24 pub sticky_session_found: Option<String>,
26 pub method: Option<Method>,
29 pub authority: Option<String>,
31 pub path: Option<String>,
33 pub status: Option<u16>,
35 pub reason: Option<String>,
37 pub user_agent: Option<String>,
39
40 pub closing: bool,
43 pub id: Ulid,
45 pub backend_id: Option<String>,
46 pub cluster_id: Option<String>,
47 pub protocol: Protocol,
49 pub public_address: SocketAddr,
51 pub session_address: Option<SocketAddr>,
53 pub sticky_name: String,
55 pub sticky_session: Option<String>,
58}
59
60impl kawa::h1::ParserCallbacks<Checkout> for HttpContext {
61 fn on_headers(&mut self, stream: &mut GenericHttpStream) {
62 match stream.kind {
63 kawa::Kind::Request => self.on_request_headers(stream),
64 kawa::Kind::Response => self.on_response_headers(stream),
65 }
66 }
67}
68
69impl HttpContext {
70 fn on_request_headers(&mut self, request: &mut GenericHttpStream) {
81 let buf = &mut request.storage.mut_buffer();
82
83 if let kawa::StatusLine::Request {
85 method,
86 authority,
87 path,
88 ..
89 } = &request.detached.status_line
90 {
91 self.method = method.data_opt(buf).map(Method::new);
92 self.authority = authority
93 .data_opt(buf)
94 .and_then(|data| from_utf8(data).ok())
95 .map(ToOwned::to_owned);
96 self.path = path
97 .data_opt(buf)
98 .and_then(|data| from_utf8(data).ok())
99 .map(ToOwned::to_owned);
100 }
101
102 let public_ip = self.public_address.ip();
107 let public_port = self.public_address.port();
108 let proto = match self.protocol {
109 Protocol::HTTP => "http",
110 Protocol::HTTPS => "https",
111 _ => unreachable!(),
112 };
113
114 for cookie in &mut request.detached.jar {
117 let key = cookie.key.data(buf);
118 if key == self.sticky_name.as_bytes() {
119 let val = cookie.val.data(buf);
120 self.sticky_session_found = from_utf8(val).ok().map(|val| val.to_string());
121 cookie.elide();
122 }
123 }
124
125 let mut x_for = None;
134 let mut forwarded = None;
135 let mut has_x_port = false;
136 let mut has_x_proto = false;
137 let mut has_connection = false;
138 for block in &mut request.blocks {
139 match block {
140 kawa::Block::Header(header) if !header.is_elided() => {
141 let key = header.key.data(buf);
142 if compare_no_case(key, b"connection") {
143 has_connection = true;
144 if self.closing {
145 header.val = kawa::Store::Static(b"close");
146 } else {
147 let val = header.val.data(buf);
148 self.keep_alive_frontend &= !compare_no_case(val, b"close");
149 }
150 } else if compare_no_case(key, b"X-Forwarded-Proto") {
151 has_x_proto = true;
152 incr!("http.trusting.x_proto");
154 let val = header.val.data(buf);
155 if !compare_no_case(val, proto.as_bytes()) {
156 incr!("http.trusting.x_proto.diff");
157 debug!(
158 "Trusting X-Forwarded-Proto for {:?} even though {:?} != {}",
159 self.authority, val, proto
160 );
161 }
162 } else if compare_no_case(key, b"X-Forwarded-Port") {
163 has_x_port = true;
164 incr!("http.trusting.x_port");
166 let val = header.val.data(buf);
167 let expected = public_port.to_string();
168 if !compare_no_case(val, expected.as_bytes()) {
169 incr!("http.trusting.x_port.diff");
170 debug!(
171 "Trusting X-Forwarded-Port for {:?} even though {:?} != {}",
172 self.authority, val, expected
173 );
174 }
175 } else if compare_no_case(key, b"X-Forwarded-For") {
176 x_for = Some(header);
177 } else if compare_no_case(key, b"Forwarded") {
178 forwarded = Some(header);
179 } else if compare_no_case(key, b"User-Agent") {
180 self.user_agent = header
181 .val
182 .data_opt(buf)
183 .and_then(|data| from_utf8(data).ok())
184 .map(ToOwned::to_owned);
185 }
186 }
187 _ => {}
188 }
189 }
190
191 if let Some(peer_addr) = self.session_address {
195 let peer_ip = peer_addr.ip();
196 let peer_port = peer_addr.port();
197 let has_x_for = x_for.is_some();
198 let has_forwarded = forwarded.is_some();
199
200 if let Some(header) = x_for {
201 header.val = kawa::Store::from_string(format!("{}, {peer_ip}", unsafe {
202 from_utf8_unchecked(header.val.data(buf))
203 }));
204 }
205 if let Some(header) = &mut forwarded {
206 let value = unsafe { from_utf8_unchecked(header.val.data(buf)) };
207 let new_value = match (peer_ip, public_ip) {
208 (IpAddr::V4(_), IpAddr::V4(_)) => {
209 format!("{value}, proto={proto};for={peer_ip}:{peer_port};by={public_ip}")
210 }
211 (IpAddr::V4(_), IpAddr::V6(_)) => {
212 format!(
213 "{value}, proto={proto};for={peer_ip}:{peer_port};by=\"{public_ip}\""
214 )
215 }
216 (IpAddr::V6(_), IpAddr::V4(_)) => {
217 format!(
218 "{value}, proto={proto};for=\"{peer_ip}:{peer_port}\";by={public_ip}"
219 )
220 }
221 (IpAddr::V6(_), IpAddr::V6(_)) => {
222 format!(
223 "{value}, proto={proto};for=\"{peer_ip}:{peer_port}\";by=\"{public_ip}\""
224 )
225 }
226 };
227 header.val = kawa::Store::from_string(new_value);
228 }
229
230 if !has_x_for {
231 request.push_block(kawa::Block::Header(kawa::Pair {
232 key: kawa::Store::Static(b"X-Forwarded-For"),
233 val: kawa::Store::from_string(peer_ip.to_string()),
234 }));
235 }
236 if !has_forwarded {
237 let value = match (peer_ip, public_ip) {
238 (IpAddr::V4(_), IpAddr::V4(_)) => {
239 format!("proto={proto};for={peer_ip}:{peer_port};by={public_ip}")
240 }
241 (IpAddr::V4(_), IpAddr::V6(_)) => {
242 format!("proto={proto};for={peer_ip}:{peer_port};by=\"{public_ip}\"")
243 }
244 (IpAddr::V6(_), IpAddr::V4(_)) => {
245 format!("proto={proto};for=\"{peer_ip}:{peer_port}\";by={public_ip}")
246 }
247 (IpAddr::V6(_), IpAddr::V6(_)) => {
248 format!("proto={proto};for=\"{peer_ip}:{peer_port}\";by=\"{public_ip}\"")
249 }
250 };
251 request.push_block(kawa::Block::Header(kawa::Pair {
252 key: kawa::Store::Static(b"Forwarded"),
253 val: kawa::Store::from_string(value),
254 }));
255 }
256 }
257 if !has_x_port {
258 request.push_block(kawa::Block::Header(kawa::Pair {
259 key: kawa::Store::Static(b"X-Forwarded-Port"),
260 val: kawa::Store::from_string(public_port.to_string()),
261 }));
262 }
263 if !has_x_proto {
264 request.push_block(kawa::Block::Header(kawa::Pair {
265 key: kawa::Store::Static(b"X-Forwarded-Proto"),
266 val: kawa::Store::Static(proto.as_bytes()),
267 }));
268 }
269
270 if !has_connection && self.closing {
272 request.push_block(kawa::Block::Header(kawa::Pair {
273 key: kawa::Store::Static(b"Connection"),
274 val: kawa::Store::Static(b"close"),
275 }));
276 }
277
278 request.push_block(kawa::Block::Header(kawa::Pair {
280 key: kawa::Store::Static(b"Sozu-Id"),
281 val: kawa::Store::from_string(self.id.to_string()),
282 }));
283 }
284
285 fn on_response_headers(&mut self, response: &mut GenericHttpStream) {
293 let buf = &mut response.storage.mut_buffer();
294
295 if let kawa::StatusLine::Response { code, reason, .. } = &response.detached.status_line {
297 self.status = Some(*code);
298 self.reason = reason
299 .data_opt(buf)
300 .and_then(|data| from_utf8(data).ok())
301 .map(ToOwned::to_owned);
302 }
303
304 if self.method == Some(Method::Head) {
305 response.parsing_phase = kawa::ParsingPhase::Terminated;
306 }
307
308 for block in &mut response.blocks {
312 match block {
313 kawa::Block::Header(header) if !header.is_elided() => {
314 let key = header.key.data(buf);
315 if compare_no_case(key, b"connection") {
316 if self.closing {
317 header.val = kawa::Store::Static(b"close");
318 } else {
319 let val = header.val.data(buf);
320 self.keep_alive_backend &= !compare_no_case(val, b"close");
321 }
322 }
323 }
324 _ => {}
325 }
326 }
327
328 if let Some(sticky_session) = &self.sticky_session {
331 if self.sticky_session != self.sticky_session_found {
332 response.push_block(kawa::Block::Header(kawa::Pair {
333 key: kawa::Store::Static(b"Set-Cookie"),
334 val: kawa::Store::from_string(format!(
335 "{}={}; Path=/",
336 self.sticky_name, sticky_session
337 )),
338 }));
339 }
340 }
341
342 response.push_block(kawa::Block::Header(kawa::Pair {
344 key: kawa::Store::Static(b"Sozu-Id"),
345 val: kawa::Store::from_string(self.id.to_string()),
346 }));
347 }
348
349 pub fn reset(&mut self) {
350 self.keep_alive_backend = true;
351 self.keep_alive_frontend = true;
352 self.sticky_session_found = None;
353 self.method = None;
354 self.authority = None;
355 self.path = None;
356 self.status = None;
357 self.reason = None;
358 self.user_agent = None;
359 }
360
361 pub fn log_context(&self) -> LogContext {
362 LogContext {
363 request_id: self.id,
364 cluster_id: self.cluster_id.as_deref(),
365 backend_id: self.backend_id.as_deref(),
366 }
367 }
368}