sozu_lib/protocol/proxy_protocol/
expect.rs1use std::{cell::RefCell, rc::Rc};
2
3use mio::{net::TcpStream, *};
4use nom::{Err, HexDisplay};
5use rusty_ulid::Ulid;
6use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext};
7
8use crate::{
9 pool::Checkout,
10 protocol::{
11 pipe::{Pipe, WebSocketContext},
12 SessionResult, SessionState,
13 },
14 socket::{SocketHandler, SocketResult},
15 sozu_command::ready::Ready,
16 tcp::TcpListener,
17 timer::TimeoutContainer,
18 Protocol, Readiness, SessionMetrics, StateResult,
19};
20
21use super::{header::ProxyAddr, parser::parse_v2_header};
22
23#[derive(Clone, Copy)]
24pub enum HeaderLen {
25 V4,
26 V6,
27 Unix,
28}
29
30pub struct ExpectProxyProtocol<Front: SocketHandler> {
32 pub addresses: Option<ProxyAddr>,
33 pub container_frontend_timeout: TimeoutContainer,
34 frontend_buffer: [u8; 232],
35 pub frontend_readiness: Readiness,
36 pub frontend_token: Token,
37 pub frontend: Front,
38 header_len: HeaderLen,
39 index: usize,
40 pub request_id: Ulid,
41}
42
43impl<Front: SocketHandler> ExpectProxyProtocol<Front> {
44 pub fn new(
48 container_frontend_timeout: TimeoutContainer,
49 frontend: Front,
50 frontend_token: Token,
51 request_id: Ulid,
52 ) -> Self {
53 ExpectProxyProtocol {
54 addresses: None,
55 container_frontend_timeout,
56 frontend_buffer: [0; 232],
57 frontend_readiness: Readiness {
58 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
59 event: Ready::EMPTY,
60 },
61 frontend_token,
62 frontend,
63 header_len: HeaderLen::V4,
64 index: 0,
65 request_id,
66 }
67 }
68
69 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
70 let total_len = match self.header_len {
71 HeaderLen::V4 => 28,
72 HeaderLen::V6 => 52,
73 HeaderLen::Unix => 232,
74 };
75
76 let (sz, socket_result) = self
77 .frontend
78 .socket_read(&mut self.frontend_buffer[self.index..total_len]);
79 trace!(
80 "FRONT proxy protocol [{:?}]: read {} bytes and res={:?}, index = {}, total_len = {}",
81 self.frontend_token,
82 sz,
83 socket_result,
84 self.index,
85 total_len
86 );
87
88 if sz > 0 {
89 self.index += sz;
90
91 count!("bytes_in", sz as i64);
92 metrics.bin += sz;
93
94 if self.index == self.frontend_buffer.len() {
95 self.frontend_readiness.interest.remove(Ready::READABLE);
96 }
97 } else {
98 self.frontend_readiness.event.remove(Ready::READABLE);
99 }
100
101 match socket_result {
102 SocketResult::Error => {
103 error!("[{:?}] (expect proxy) front socket error, closing the connection(read {}, wrote {})", self.frontend_token, metrics.bin, metrics.bout);
104 incr!("proxy_protocol.errors");
105 self.frontend_readiness.reset();
106 return SessionResult::Close;
107 }
108 SocketResult::WouldBlock => {
109 self.frontend_readiness.event.remove(Ready::READABLE);
110 }
111 SocketResult::Closed | SocketResult::Continue => {}
112 }
113
114 match parse_v2_header(&self.frontend_buffer[..self.index]) {
115 Ok((rest, header)) => {
116 trace!(
117 "got expect header: {:?}, rest.len() = {}",
118 header,
119 rest.len()
120 );
121 self.addresses = Some(header.addr);
122 SessionResult::Upgrade
123 }
124 Err(Err::Incomplete(_)) => {
125 match self.header_len {
126 HeaderLen::V4 => {
127 if self.index == 28 {
128 self.header_len = HeaderLen::V6;
129 }
130 }
131 HeaderLen::V6 => {
132 if self.index == 52 {
133 self.header_len = HeaderLen::Unix;
134 }
135 }
136 HeaderLen::Unix => {
137 if self.index == 232 {
138 error!(
139 "[{:?}] front socket parse error, closing the connection",
140 self.frontend_token
141 );
142 incr!("proxy_protocol.errors");
143 self.frontend_readiness.reset();
144 return SessionResult::Continue;
145 }
146 }
147 };
148 SessionResult::Continue
149 }
150 Err(Err::Error(e)) | Err(Err::Failure(e)) => {
151 error!("[{:?}] expect proxy protocol front socket parse error, closing the connection:\n{}", self.frontend_token, e.input.to_hex(16));
152 incr!("proxy_protocol.errors");
153 self.frontend_readiness.reset();
154 SessionResult::Close
155 }
156 }
157 }
158
159 pub fn front_socket(&self) -> &TcpStream {
160 self.frontend.socket_ref()
161 }
162
163 pub fn into_pipe(
164 self,
165 front_buf: Checkout,
166 back_buf: Checkout,
167 backend_socket: Option<TcpStream>,
168 backend_token: Option<Token>,
169 listener: Rc<RefCell<TcpListener>>,
170 ) -> Pipe<Front, TcpListener> {
171 let addr = self.front_socket().peer_addr().ok();
172
173 let mut pipe = Pipe::new(
174 back_buf,
175 None,
176 backend_socket,
177 None,
178 None,
179 Some(self.container_frontend_timeout),
180 None,
181 front_buf,
182 self.frontend_token,
183 self.frontend,
184 listener,
185 Protocol::TCP,
186 self.request_id,
187 addr,
188 WebSocketContext::Tcp,
189 );
190
191 pipe.frontend_readiness.event = self.frontend_readiness.event;
192
193 if let Some(backend_token) = backend_token {
194 pipe.set_back_token(backend_token);
195 }
196
197 pipe
198 }
199
200 pub fn log_context(&self) -> LogContext {
201 LogContext {
202 request_id: self.request_id,
203 cluster_id: None,
204 backend_id: None,
205 }
206 }
207}
208
209impl<Front: SocketHandler> SessionState for ExpectProxyProtocol<Front> {
210 fn ready(
211 &mut self,
212 _session: Rc<RefCell<dyn crate::ProxySession>>,
213 _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
214 metrics: &mut SessionMetrics,
215 ) -> SessionResult {
216 let mut counter = 0;
217
218 if self.frontend_readiness.event.is_hup() {
219 return SessionResult::Close;
220 }
221
222 while counter < MAX_LOOP_ITERATIONS {
223 let frontend_interest = self.frontend_readiness.filter_interest();
224
225 trace!(
226 "PROXY\t{} {:?} {:?} -> None",
227 self.log_context(),
228 self.frontend_token,
229 self.frontend_readiness
230 );
231
232 if frontend_interest.is_empty() {
233 break;
234 }
235
236 if frontend_interest.is_readable() {
237 let session_result = self.readable(metrics);
238 if session_result != SessionResult::Continue {
239 return session_result;
240 }
241 }
242
243 if frontend_interest.is_error() {
244 error!(
245 "PROXY session {:?} front error, disconnecting",
246 self.frontend_token
247 );
248 self.frontend_readiness.interest = Ready::EMPTY;
249
250 return SessionResult::Close;
251 }
252
253 counter += 1;
254 }
255
256 if counter >= MAX_LOOP_ITERATIONS {
257 error!(
258 "PROXY\thandling session {:?} went through {} iterations, there's a probable infinite loop bug, closing the connection",
259 self.frontend_token, MAX_LOOP_ITERATIONS
260 );
261 incr!("http.infinite_loop.error");
262
263 self.print_state("");
264
265 return SessionResult::Close;
266 }
267
268 SessionResult::Continue
269 }
270
271 fn update_readiness(&mut self, token: Token, events: Ready) {
272 if self.frontend_token == token {
273 self.frontend_readiness.event |= events;
274 }
275 }
276
277 fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
278 if self.frontend_token == token {
279 self.container_frontend_timeout.triggered();
280 return StateResult::CloseSession;
281 }
282
283 error!(
284 "Expect state: got timeout for an invalid token: {:?}",
285 token
286 );
287 StateResult::CloseSession
288 }
289
290 fn cancel_timeouts(&mut self) {
291 self.container_frontend_timeout.cancel();
292 }
293
294 fn print_state(&self, context: &str) {
295 error!(
296 "{} Session(Expect)\n\tFrontend:\n\t\ttoken: {:?}\treadiness: {:?}",
297 context, self.frontend_token, self.frontend_readiness
298 );
299 }
300}
301
302#[cfg(test)]
303mod expect_test {
304 use super::*;
305 use mio::net::TcpListener;
306 use rusty_ulid::Ulid;
307 use std::{
308 io::Write,
309 net::TcpStream as StdTcpStream,
310 net::{IpAddr, Ipv4Addr, SocketAddr},
311 sync::{Arc, Barrier},
312 thread::{self, JoinHandle},
313 time::Duration,
314 };
315
316 use crate::protocol::proxy_protocol::header::*;
317
318 #[test]
324 fn middleware_should_receive_proxy_protocol_header_from_an_upfront_middleware() {
325 setup_test_logger!();
326 let middleware_addr: SocketAddr = "127.0.0.1:3500".parse().expect("parse address error");
327 let barrier = Arc::new(Barrier::new(2));
328
329 let upfront = start_upfront_middleware(middleware_addr, barrier.clone());
330 start_middleware(middleware_addr, barrier);
331
332 upfront.join().expect("should join");
333 }
334
335 fn start_middleware(middleware_addr: SocketAddr, barrier: Arc<Barrier>) {
337 let upfront_middleware_conn_listener = TcpListener::bind(middleware_addr)
338 .expect("could not accept upfront middleware connection");
339 let session_stream;
340 barrier.wait();
341
342 loop {
344 if let Ok((stream, _addr)) = upfront_middleware_conn_listener.accept() {
345 session_stream = stream;
346 break;
347 }
348 }
349
350 let mut session_metrics = SessionMetrics::new(None);
351 let container_frontend_timeout = TimeoutContainer::new(Duration::from_secs(10), Token(0));
352 let mut expect_pp = ExpectProxyProtocol::new(
353 container_frontend_timeout,
354 session_stream,
355 Token(0),
356 Ulid::generate(),
357 );
358
359 let mut res = SessionResult::Continue;
360 while res == SessionResult::Continue {
361 res = expect_pp.readable(&mut session_metrics);
362 }
363
364 if res != SessionResult::Upgrade {
365 panic!("Should receive a complete proxy protocol header, res = {res:?}");
366 };
367 }
368
369 fn start_upfront_middleware(
371 next_middleware_addr: SocketAddr,
372 barrier: Arc<Barrier>,
373 ) -> JoinHandle<()> {
374 thread::spawn(move || {
375 let src_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(125, 25, 10, 1)), 8080);
376 let dst_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 4, 5, 8)), 4200);
377 let proxy_protocol = HeaderV2::new(Command::Local, src_addr, dst_addr).into_bytes();
378
379 barrier.wait();
380 match StdTcpStream::connect(next_middleware_addr) {
381 Ok(mut stream) => {
382 stream.write(&proxy_protocol).unwrap();
383 }
384 Err(e) => panic!("could not connect to the next middleware: {e}"),
385 };
386 })
387 }
388}