sozu_lib/protocol/proxy_protocol/
expect.rs1use std::{cell::RefCell, rc::Rc};
2
3use mio::{net::TcpStream, *};
4use nom::{Err, HexDisplay};
5use rusty_ulid::Ulid;
6use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext};
7
8use super::{header::ProxyAddr, parser::parse_v2_header};
9use crate::{
10 Protocol, Readiness, SessionMetrics, StateResult,
11 pool::Checkout,
12 protocol::{
13 SessionResult, SessionState,
14 pipe::{Pipe, WebSocketContext},
15 },
16 socket::{SocketHandler, SocketResult},
17 sozu_command::ready::Ready,
18 tcp::TcpListener,
19 timer::TimeoutContainer,
20};
21
22#[derive(Clone, Copy)]
23pub enum HeaderLen {
24 V4,
25 V6,
26 Unix,
27}
28
29pub struct ExpectProxyProtocol<Front: SocketHandler> {
31 pub addresses: Option<ProxyAddr>,
32 pub container_frontend_timeout: TimeoutContainer,
33 frontend_buffer: [u8; 232],
34 pub frontend_readiness: Readiness,
35 pub frontend_token: Token,
36 pub frontend: Front,
37 header_len: HeaderLen,
38 index: usize,
39 pub request_id: Ulid,
40}
41
42impl<Front: SocketHandler> ExpectProxyProtocol<Front> {
43 pub fn new(
47 container_frontend_timeout: TimeoutContainer,
48 frontend: Front,
49 frontend_token: Token,
50 request_id: Ulid,
51 ) -> Self {
52 ExpectProxyProtocol {
53 addresses: None,
54 container_frontend_timeout,
55 frontend_buffer: [0; 232],
56 frontend_readiness: Readiness {
57 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
58 event: Ready::EMPTY,
59 },
60 frontend_token,
61 frontend,
62 header_len: HeaderLen::V4,
63 index: 0,
64 request_id,
65 }
66 }
67
68 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
69 let total_len = match self.header_len {
70 HeaderLen::V4 => 28,
71 HeaderLen::V6 => 52,
72 HeaderLen::Unix => 232,
73 };
74
75 let (sz, socket_result) = self
76 .frontend
77 .socket_read(&mut self.frontend_buffer[self.index..total_len]);
78 trace!(
79 "FRONT proxy protocol [{:?}]: read {} bytes and res={:?}, index = {}, total_len = {}",
80 self.frontend_token, sz, socket_result, self.index, total_len
81 );
82
83 if sz > 0 {
84 self.index += sz;
85
86 count!("bytes_in", sz as i64);
87 metrics.bin += sz;
88
89 if self.index == self.frontend_buffer.len() {
90 self.frontend_readiness.interest.remove(Ready::READABLE);
91 }
92 } else {
93 self.frontend_readiness.event.remove(Ready::READABLE);
94 }
95
96 match socket_result {
97 SocketResult::Error => {
98 error!(
99 "[{:?}] (expect proxy) front socket error, closing the connection(read {}, wrote {})",
100 self.frontend_token, metrics.bin, metrics.bout
101 );
102 incr!("proxy_protocol.errors");
103 self.frontend_readiness.reset();
104 return SessionResult::Close;
105 }
106 SocketResult::WouldBlock => {
107 self.frontend_readiness.event.remove(Ready::READABLE);
108 }
109 SocketResult::Closed | SocketResult::Continue => {}
110 }
111
112 match parse_v2_header(&self.frontend_buffer[..self.index]) {
113 Ok((rest, header)) => {
114 trace!(
115 "got expect header: {:?}, rest.len() = {}",
116 header,
117 rest.len()
118 );
119 self.addresses = Some(header.addr);
120 SessionResult::Upgrade
121 }
122 Err(Err::Incomplete(_)) => {
123 match self.header_len {
124 HeaderLen::V4 => {
125 if self.index == 28 {
126 self.header_len = HeaderLen::V6;
127 }
128 }
129 HeaderLen::V6 => {
130 if self.index == 52 {
131 self.header_len = HeaderLen::Unix;
132 }
133 }
134 HeaderLen::Unix => {
135 if self.index == 232 {
136 error!(
137 "[{:?}] front socket parse error, closing the connection",
138 self.frontend_token
139 );
140 incr!("proxy_protocol.errors");
141 self.frontend_readiness.reset();
142 return SessionResult::Continue;
143 }
144 }
145 };
146 SessionResult::Continue
147 }
148 Err(Err::Error(e)) | Err(Err::Failure(e)) => {
149 error!(
150 "[{:?}] expect proxy protocol front socket parse error, closing the connection:\n{}",
151 self.frontend_token,
152 e.input.to_hex(16)
153 );
154 incr!("proxy_protocol.errors");
155 self.frontend_readiness.reset();
156 SessionResult::Close
157 }
158 }
159 }
160
161 pub fn front_socket(&self) -> &TcpStream {
162 self.frontend.socket_ref()
163 }
164
165 pub fn into_pipe(
166 self,
167 front_buf: Checkout,
168 back_buf: Checkout,
169 backend_socket: Option<TcpStream>,
170 backend_token: Option<Token>,
171 listener: Rc<RefCell<TcpListener>>,
172 ) -> Pipe<Front, TcpListener> {
173 let addr = self.front_socket().peer_addr().ok();
174
175 let mut pipe = Pipe::new(
176 back_buf,
177 None,
178 backend_socket,
179 None,
180 None,
181 Some(self.container_frontend_timeout),
182 None,
183 front_buf,
184 self.frontend_token,
185 self.frontend,
186 listener,
187 Protocol::TCP,
188 self.request_id,
189 addr,
190 WebSocketContext::Tcp,
191 );
192
193 pipe.frontend_readiness.event = self.frontend_readiness.event;
194
195 if let Some(backend_token) = backend_token {
196 pipe.set_back_token(backend_token);
197 }
198
199 pipe
200 }
201
202 pub fn log_context(&self) -> LogContext<'_> {
203 LogContext {
204 request_id: self.request_id,
205 cluster_id: None,
206 backend_id: None,
207 }
208 }
209}
210
211impl<Front: SocketHandler> SessionState for ExpectProxyProtocol<Front> {
212 fn ready(
213 &mut self,
214 _session: Rc<RefCell<dyn crate::ProxySession>>,
215 _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
216 metrics: &mut SessionMetrics,
217 ) -> SessionResult {
218 let mut counter = 0;
219
220 if self.frontend_readiness.event.is_hup() {
221 return SessionResult::Close;
222 }
223
224 while counter < MAX_LOOP_ITERATIONS {
225 let frontend_interest = self.frontend_readiness.filter_interest();
226
227 trace!(
228 "PROXY\t{} {:?} {:?} -> None",
229 self.log_context(),
230 self.frontend_token,
231 self.frontend_readiness
232 );
233
234 if frontend_interest.is_empty() {
235 break;
236 }
237
238 if frontend_interest.is_readable() {
239 let session_result = self.readable(metrics);
240 if session_result != SessionResult::Continue {
241 return session_result;
242 }
243 }
244
245 if frontend_interest.is_error() {
246 error!(
247 "PROXY session {:?} front error, disconnecting",
248 self.frontend_token
249 );
250 self.frontend_readiness.interest = Ready::EMPTY;
251
252 return SessionResult::Close;
253 }
254
255 counter += 1;
256 }
257
258 if counter >= MAX_LOOP_ITERATIONS {
259 error!(
260 "PROXY\thandling session {:?} went through {} iterations, there's a probable infinite loop bug, closing the connection",
261 self.frontend_token, MAX_LOOP_ITERATIONS
262 );
263 incr!("http.infinite_loop.error");
264
265 self.print_state("");
266
267 return SessionResult::Close;
268 }
269
270 SessionResult::Continue
271 }
272
273 fn update_readiness(&mut self, token: Token, events: Ready) {
274 if self.frontend_token == token {
275 self.frontend_readiness.event |= events;
276 }
277 }
278
279 fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
280 if self.frontend_token == token {
281 self.container_frontend_timeout.triggered();
282 return StateResult::CloseSession;
283 }
284
285 error!(
286 "Expect state: got timeout for an invalid token: {:?}",
287 token
288 );
289 StateResult::CloseSession
290 }
291
292 fn cancel_timeouts(&mut self) {
293 self.container_frontend_timeout.cancel();
294 }
295
296 fn print_state(&self, context: &str) {
297 error!(
298 "{} Session(Expect)\n\tFrontend:\n\t\ttoken: {:?}\treadiness: {:?}",
299 context, self.frontend_token, self.frontend_readiness
300 );
301 }
302}
303
304#[cfg(test)]
305mod expect_test {
306 use std::{
307 io::Write,
308 net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream as StdTcpStream},
309 sync::{Arc, Barrier},
310 thread::{self, JoinHandle},
311 time::Duration,
312 };
313
314 use mio::net::TcpListener;
315 use rusty_ulid::Ulid;
316
317 use super::*;
318 use crate::protocol::proxy_protocol::header::*;
319
320 #[test]
326 fn middleware_should_receive_proxy_protocol_header_from_an_upfront_middleware() {
327 setup_test_logger!();
328 let middleware_addr: SocketAddr = "127.0.0.1:3500".parse().expect("parse address error");
329 let barrier = Arc::new(Barrier::new(2));
330
331 let upfront = start_upfront_middleware(middleware_addr, barrier.clone());
332 start_middleware(middleware_addr, barrier);
333
334 upfront.join().expect("should join");
335 }
336
337 fn start_middleware(middleware_addr: SocketAddr, barrier: Arc<Barrier>) {
339 let upfront_middleware_conn_listener = TcpListener::bind(middleware_addr)
340 .expect("could not accept upfront middleware connection");
341 let session_stream;
342 barrier.wait();
343
344 loop {
346 if let Ok((stream, _addr)) = upfront_middleware_conn_listener.accept() {
347 session_stream = stream;
348 break;
349 }
350 }
351
352 let mut session_metrics = SessionMetrics::new(None);
353 let container_frontend_timeout = TimeoutContainer::new(Duration::from_secs(10), Token(0));
354 let mut expect_pp = ExpectProxyProtocol::new(
355 container_frontend_timeout,
356 session_stream,
357 Token(0),
358 Ulid::generate(),
359 );
360
361 let mut res = SessionResult::Continue;
362 while res == SessionResult::Continue {
363 res = expect_pp.readable(&mut session_metrics);
364 }
365
366 if res != SessionResult::Upgrade {
367 panic!("Should receive a complete proxy protocol header, res = {res:?}");
368 };
369 }
370
371 fn start_upfront_middleware(
373 next_middleware_addr: SocketAddr,
374 barrier: Arc<Barrier>,
375 ) -> JoinHandle<()> {
376 thread::spawn(move || {
377 let src_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(125, 25, 10, 1)), 8080);
378 let dst_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 4, 5, 8)), 4200);
379 let proxy_protocol = HeaderV2::new(Command::Local, src_addr, dst_addr).into_bytes();
380
381 barrier.wait();
382 match StdTcpStream::connect(next_middleware_addr) {
383 Ok(mut stream) => {
384 stream.write(&proxy_protocol).unwrap();
385 }
386 Err(e) => panic!("could not connect to the next middleware: {e}"),
387 };
388 })
389 }
390}