1use std::{cell::RefCell, rc::Rc};
10
11use mio::{net::TcpStream, *};
12use nom::{Err, HexDisplay};
13use rusty_ulid::Ulid;
14use sozu_command::{
15 config::MAX_LOOP_ITERATIONS,
16 logging::{LogContext, ansi_palette},
17};
18
19use super::{header::ProxyAddr, parser::parse_v2_header};
20use crate::metrics::names;
21use crate::{
22 Protocol, Readiness, SessionMetrics, StateResult,
23 pool::Checkout,
24 protocol::{
25 SessionResult, SessionState,
26 pipe::{Pipe, WebSocketContext},
27 },
28 socket::{SocketHandler, SocketResult},
29 sozu_command::ready::Ready,
30 tcp::TcpListener,
31 timer::TimeoutContainer,
32};
33
34macro_rules! log_module_context {
39 () => {{
40 let (open, reset, _, _, _) = ansi_palette();
41 format!(
42 "{open}PROXY-EXPECT{reset}\t >>>",
43 open = open,
44 reset = reset
45 )
46 }};
47}
48
49macro_rules! log_context {
55 ($self:expr) => {{
56 let (open, reset, grey, gray, white) = ansi_palette();
57 format!(
58 "{gray}{ctx}{reset}\t{open}PROXY-EXPECT{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}index{reset}={white}{index}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
59 open = open,
60 reset = reset,
61 grey = grey,
62 gray = gray,
63 white = white,
64 ctx = $self.log_context(),
65 frontend = $self.frontend_token.0,
66 index = $self.index,
67 readiness = $self.frontend_readiness,
68 )
69 }};
70}
71
72#[derive(Clone, Copy)]
73pub enum HeaderLen {
74 V4,
75 V6,
76 Unix,
77}
78
79pub struct ExpectProxyProtocol<Front: SocketHandler> {
81 pub addresses: Option<ProxyAddr>,
82 pub container_frontend_timeout: TimeoutContainer,
83 frontend_buffer: [u8; 232],
84 pub frontend_readiness: Readiness,
85 pub frontend_token: Token,
86 pub frontend: Front,
87 header_len: HeaderLen,
88 index: usize,
89 pub request_id: Ulid,
90}
91
92impl<Front: SocketHandler> ExpectProxyProtocol<Front> {
93 pub fn new(
97 container_frontend_timeout: TimeoutContainer,
98 frontend: Front,
99 frontend_token: Token,
100 request_id: Ulid,
101 ) -> Self {
102 ExpectProxyProtocol {
103 addresses: None,
104 container_frontend_timeout,
105 frontend_buffer: [0; 232],
106 frontend_readiness: Readiness {
107 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
108 event: Ready::EMPTY,
109 },
110 frontend_token,
111 frontend,
112 header_len: HeaderLen::V4,
113 index: 0,
114 request_id,
115 }
116 }
117
118 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
119 let total_len = match self.header_len {
120 HeaderLen::V4 => 28,
121 HeaderLen::V6 => 52,
122 HeaderLen::Unix => 232,
123 };
124
125 let (sz, socket_result) = self
126 .frontend
127 .socket_read(&mut self.frontend_buffer[self.index..total_len]);
128 trace!(
129 "{} read {} bytes and res={:?}, total_len = {}",
130 log_context!(self),
131 sz,
132 socket_result,
133 total_len
134 );
135
136 if sz > 0 {
137 self.index += sz;
138
139 count!(names::backend::BYTES_IN, sz as i64);
140 metrics.bin += sz;
141
142 if self.index == self.frontend_buffer.len() {
143 self.frontend_readiness.interest.remove(Ready::READABLE);
144 }
145 } else {
146 self.frontend_readiness.event.remove(Ready::READABLE);
147 }
148
149 match socket_result {
150 SocketResult::Error => {
151 error!(
152 "{} front socket error, closing the connection (read {}, wrote {})",
153 log_context!(self),
154 metrics.bin,
155 metrics.bout
156 );
157 incr!(names::proxy_protocol::ERRORS);
158 self.frontend_readiness.reset();
159 return SessionResult::Close;
160 }
161 SocketResult::WouldBlock => {
162 self.frontend_readiness.event.remove(Ready::READABLE);
163 }
164 SocketResult::Closed => {
165 if self.index == 0 {
171 trace!(
172 "{} socket closed with 0 bytes, closing session",
173 log_context!(self)
174 );
175 return SessionResult::Close;
176 }
177 }
178 SocketResult::Continue => {}
179 }
180
181 match parse_v2_header(&self.frontend_buffer[..self.index]) {
182 Ok((rest, header)) => {
183 trace!(
184 "{} got expect header: {:?}, rest.len() = {}",
185 log_context!(self),
186 header,
187 rest.len()
188 );
189 self.addresses = Some(header.addr);
190 SessionResult::Upgrade
191 }
192 Err(Err::Incomplete(_)) => {
193 match self.header_len {
194 HeaderLen::V4 => {
195 if self.index == 28 {
196 self.header_len = HeaderLen::V6;
197 }
198 }
199 HeaderLen::V6 => {
200 if self.index == 52 {
201 self.header_len = HeaderLen::Unix;
202 }
203 }
204 HeaderLen::Unix => {
205 if self.index == 232 {
206 error!(
207 "{} proxy protocol header exceeds maximum size (232 bytes), closing",
208 log_context!(self)
209 );
210 incr!(names::proxy_protocol::ERRORS);
211 self.frontend_readiness.reset();
212 return SessionResult::Close;
213 }
214 }
215 };
216 SessionResult::Continue
217 }
218 Err(Err::Error(e)) | Err(Err::Failure(e)) => {
219 error!(
220 "{} parse error, closing the connection:\n{}",
221 log_context!(self),
222 e.input.to_hex(16)
223 );
224 incr!(names::proxy_protocol::ERRORS);
225 self.frontend_readiness.reset();
226 SessionResult::Close
227 }
228 }
229 }
230
231 pub fn front_socket(&self) -> &TcpStream {
232 self.frontend.socket_ref()
233 }
234
235 pub fn into_pipe(
236 self,
237 front_buf: Checkout,
238 back_buf: Checkout,
239 backend_socket: Option<TcpStream>,
240 backend_token: Option<Token>,
241 listener: Rc<RefCell<TcpListener>>,
242 ) -> Pipe<Front, TcpListener> {
243 let addr = self
251 .addresses
252 .as_ref()
253 .and_then(|pa| pa.source())
254 .or_else(|| self.front_socket().peer_addr().ok());
255
256 let mut pipe = Pipe::new(
257 back_buf,
258 None,
259 backend_socket,
260 None,
261 None,
262 Some(self.container_frontend_timeout),
263 None,
264 front_buf,
265 self.frontend_token,
266 self.frontend,
267 listener,
268 Protocol::TCP,
269 self.request_id,
270 self.request_id,
271 addr,
272 WebSocketContext::Tcp,
273 );
274
275 pipe.frontend_readiness.event = self.frontend_readiness.event;
276
277 if let Some(backend_token) = backend_token {
278 pipe.set_back_token(backend_token);
279 }
280
281 pipe
282 }
283
284 pub fn log_context(&self) -> LogContext<'_> {
285 LogContext {
286 session_id: self.request_id,
287 request_id: None,
288 cluster_id: None,
289 backend_id: None,
290 }
291 }
292}
293
294impl<Front: SocketHandler> SessionState for ExpectProxyProtocol<Front> {
295 fn ready(
296 &mut self,
297 _session: Rc<RefCell<dyn crate::ProxySession>>,
298 _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
299 metrics: &mut SessionMetrics,
300 ) -> SessionResult {
301 let mut counter = 0;
302
303 if self.frontend_readiness.event.is_hup() {
304 return SessionResult::Close;
305 }
306
307 while counter < MAX_LOOP_ITERATIONS {
308 let frontend_interest = self.frontend_readiness.filter_interest();
309
310 trace!(
311 "{} {:?} -> None",
312 log_context!(self),
313 self.frontend_readiness
314 );
315
316 if frontend_interest.is_empty() {
317 break;
318 }
319
320 if frontend_interest.is_readable() {
321 let session_result = self.readable(metrics);
322 if session_result != SessionResult::Continue {
323 return session_result;
324 }
325 }
326
327 if frontend_interest.is_error() {
328 error!("{} front error, disconnecting", log_context!(self));
329 self.frontend_readiness.interest = Ready::EMPTY;
330
331 return SessionResult::Close;
332 }
333
334 counter += 1;
335 }
336
337 if counter >= MAX_LOOP_ITERATIONS {
338 error!(
339 "{} handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
340 log_context!(self),
341 MAX_LOOP_ITERATIONS
342 );
343 incr!(names::http::INFINITE_LOOP_ERROR);
344
345 self.print_state("");
346
347 return SessionResult::Close;
348 }
349
350 SessionResult::Continue
351 }
352
353 fn update_readiness(&mut self, token: Token, events: Ready) {
354 if self.frontend_token == token {
355 self.frontend_readiness.event |= events;
356 }
357 }
358
359 fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
360 if self.frontend_token == token {
361 self.container_frontend_timeout.triggered();
362 return StateResult::CloseSession;
363 }
364
365 error!(
366 "{} got timeout for an invalid token: {:?}",
367 log_module_context!(),
368 token
369 );
370 StateResult::CloseSession
371 }
372
373 fn cancel_timeouts(&mut self) {
374 self.container_frontend_timeout.cancel();
375 }
376
377 fn print_state(&self, context: &str) {
378 error!(
379 "{} {} Session(Expect)\n\tFrontend:\n\t\ttoken: {:?}\treadiness: {:?}",
380 log_context!(self),
381 context,
382 self.frontend_token,
383 self.frontend_readiness
384 );
385 }
386}
387
388#[cfg(test)]
389mod expect_test {
390 use std::{
391 io::Write,
392 net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream as StdTcpStream},
393 sync::{Arc, Barrier},
394 thread::{self, JoinHandle},
395 time::Duration,
396 };
397
398 use mio::net::TcpListener;
399 use rusty_ulid::Ulid;
400
401 use super::*;
402 use crate::protocol::proxy_protocol::header::*;
403
404 #[test]
410 fn middleware_should_receive_proxy_protocol_header_from_an_upfront_middleware() {
411 setup_test_logger!();
412 let middleware_addr: SocketAddr = "127.0.0.1:3500".parse().expect("parse address error");
413 let barrier = Arc::new(Barrier::new(2));
414
415 let upfront = start_upfront_middleware(middleware_addr, barrier.clone());
416 start_middleware(middleware_addr, barrier);
417
418 upfront.join().expect("should join");
419 }
420
421 fn start_middleware(middleware_addr: SocketAddr, barrier: Arc<Barrier>) {
423 let upfront_middleware_conn_listener = TcpListener::bind(middleware_addr)
424 .expect("could not accept upfront middleware connection");
425 let session_stream;
426 barrier.wait();
427
428 loop {
430 if let Ok((stream, _addr)) = upfront_middleware_conn_listener.accept() {
431 session_stream = stream;
432 break;
433 }
434 }
435
436 let mut session_metrics = SessionMetrics::new(None);
437 let container_frontend_timeout = TimeoutContainer::new(Duration::from_secs(10), Token(0));
438 let mut expect_pp = ExpectProxyProtocol::new(
439 container_frontend_timeout,
440 session_stream,
441 Token(0),
442 Ulid::generate(),
443 );
444
445 let mut res = SessionResult::Continue;
446 while res == SessionResult::Continue {
447 res = expect_pp.readable(&mut session_metrics);
448 }
449
450 if res != SessionResult::Upgrade {
451 panic!("Should receive a complete proxy protocol header, res = {res:?}");
452 };
453 }
454
455 fn start_upfront_middleware(
457 next_middleware_addr: SocketAddr,
458 barrier: Arc<Barrier>,
459 ) -> JoinHandle<()> {
460 thread::spawn(move || {
461 let src_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(125, 25, 10, 1)), 8080);
462 let dst_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 4, 5, 8)), 4200);
463 let proxy_protocol = HeaderV2::new(Command::Local, src_addr, dst_addr).into_bytes();
464
465 barrier.wait();
466 match StdTcpStream::connect(next_middleware_addr) {
467 Ok(mut stream) => {
468 stream.write_all(&proxy_protocol).unwrap();
469 }
470 Err(e) => panic!("could not connect to the next middleware: {e}"),
471 };
472 })
473 }
474}