sozu_lib/protocol/proxy_protocol/
expect.rs

1use std::{cell::RefCell, rc::Rc};
2
3use mio::{net::TcpStream, *};
4use nom::{Err, HexDisplay};
5use rusty_ulid::Ulid;
6use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext};
7
8use crate::{
9    pool::Checkout,
10    protocol::{
11        pipe::{Pipe, WebSocketContext},
12        SessionResult, SessionState,
13    },
14    socket::{SocketHandler, SocketResult},
15    sozu_command::ready::Ready,
16    tcp::TcpListener,
17    timer::TimeoutContainer,
18    Protocol, Readiness, SessionMetrics, StateResult,
19};
20
21use super::{header::ProxyAddr, parser::parse_v2_header};
22
23#[derive(Clone, Copy)]
24pub enum HeaderLen {
25    V4,
26    V6,
27    Unix,
28}
29
30// TODO: should have a backend
31pub struct ExpectProxyProtocol<Front: SocketHandler> {
32    pub addresses: Option<ProxyAddr>,
33    pub container_frontend_timeout: TimeoutContainer,
34    frontend_buffer: [u8; 232],
35    pub frontend_readiness: Readiness,
36    pub frontend_token: Token,
37    pub frontend: Front,
38    header_len: HeaderLen,
39    index: usize,
40    pub request_id: Ulid,
41}
42
43impl<Front: SocketHandler> ExpectProxyProtocol<Front> {
44    /// Instantiate a new ExpectProxyProtocol SessionState with:
45    /// - frontend_interest: READABLE | HUP | ERROR
46    /// - frontend_event: EMPTY
47    pub fn new(
48        container_frontend_timeout: TimeoutContainer,
49        frontend: Front,
50        frontend_token: Token,
51        request_id: Ulid,
52    ) -> Self {
53        ExpectProxyProtocol {
54            addresses: None,
55            container_frontend_timeout,
56            frontend_buffer: [0; 232],
57            frontend_readiness: Readiness {
58                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
59                event: Ready::EMPTY,
60            },
61            frontend_token,
62            frontend,
63            header_len: HeaderLen::V4,
64            index: 0,
65            request_id,
66        }
67    }
68
69    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
70        let total_len = match self.header_len {
71            HeaderLen::V4 => 28,
72            HeaderLen::V6 => 52,
73            HeaderLen::Unix => 232,
74        };
75
76        let (sz, socket_result) = self
77            .frontend
78            .socket_read(&mut self.frontend_buffer[self.index..total_len]);
79        trace!(
80            "FRONT proxy protocol [{:?}]: read {} bytes and res={:?}, index = {}, total_len = {}",
81            self.frontend_token,
82            sz,
83            socket_result,
84            self.index,
85            total_len
86        );
87
88        if sz > 0 {
89            self.index += sz;
90
91            count!("bytes_in", sz as i64);
92            metrics.bin += sz;
93
94            if self.index == self.frontend_buffer.len() {
95                self.frontend_readiness.interest.remove(Ready::READABLE);
96            }
97        } else {
98            self.frontend_readiness.event.remove(Ready::READABLE);
99        }
100
101        match socket_result {
102            SocketResult::Error => {
103                error!("[{:?}] (expect proxy) front socket error, closing the connection(read {}, wrote {})", self.frontend_token, metrics.bin, metrics.bout);
104                incr!("proxy_protocol.errors");
105                self.frontend_readiness.reset();
106                return SessionResult::Close;
107            }
108            SocketResult::WouldBlock => {
109                self.frontend_readiness.event.remove(Ready::READABLE);
110            }
111            SocketResult::Closed | SocketResult::Continue => {}
112        }
113
114        match parse_v2_header(&self.frontend_buffer[..self.index]) {
115            Ok((rest, header)) => {
116                trace!(
117                    "got expect header: {:?}, rest.len() = {}",
118                    header,
119                    rest.len()
120                );
121                self.addresses = Some(header.addr);
122                SessionResult::Upgrade
123            }
124            Err(Err::Incomplete(_)) => {
125                match self.header_len {
126                    HeaderLen::V4 => {
127                        if self.index == 28 {
128                            self.header_len = HeaderLen::V6;
129                        }
130                    }
131                    HeaderLen::V6 => {
132                        if self.index == 52 {
133                            self.header_len = HeaderLen::Unix;
134                        }
135                    }
136                    HeaderLen::Unix => {
137                        if self.index == 232 {
138                            error!(
139                                "[{:?}] front socket parse error, closing the connection",
140                                self.frontend_token
141                            );
142                            incr!("proxy_protocol.errors");
143                            self.frontend_readiness.reset();
144                            return SessionResult::Continue;
145                        }
146                    }
147                };
148                SessionResult::Continue
149            }
150            Err(Err::Error(e)) | Err(Err::Failure(e)) => {
151                error!("[{:?}] expect proxy protocol front socket parse error, closing the connection:\n{}", self.frontend_token, e.input.to_hex(16));
152                incr!("proxy_protocol.errors");
153                self.frontend_readiness.reset();
154                SessionResult::Close
155            }
156        }
157    }
158
159    pub fn front_socket(&self) -> &TcpStream {
160        self.frontend.socket_ref()
161    }
162
163    pub fn into_pipe(
164        self,
165        front_buf: Checkout,
166        back_buf: Checkout,
167        backend_socket: Option<TcpStream>,
168        backend_token: Option<Token>,
169        listener: Rc<RefCell<TcpListener>>,
170    ) -> Pipe<Front, TcpListener> {
171        let addr = self.front_socket().peer_addr().ok();
172
173        let mut pipe = Pipe::new(
174            back_buf,
175            None,
176            backend_socket,
177            None,
178            None,
179            Some(self.container_frontend_timeout),
180            None,
181            front_buf,
182            self.frontend_token,
183            self.frontend,
184            listener,
185            Protocol::TCP,
186            self.request_id,
187            addr,
188            WebSocketContext::Tcp,
189        );
190
191        pipe.frontend_readiness.event = self.frontend_readiness.event;
192
193        if let Some(backend_token) = backend_token {
194            pipe.set_back_token(backend_token);
195        }
196
197        pipe
198    }
199
200    pub fn log_context(&self) -> LogContext {
201        LogContext {
202            request_id: self.request_id,
203            cluster_id: None,
204            backend_id: None,
205        }
206    }
207}
208
209impl<Front: SocketHandler> SessionState for ExpectProxyProtocol<Front> {
210    fn ready(
211        &mut self,
212        _session: Rc<RefCell<dyn crate::ProxySession>>,
213        _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
214        metrics: &mut SessionMetrics,
215    ) -> SessionResult {
216        let mut counter = 0;
217
218        if self.frontend_readiness.event.is_hup() {
219            return SessionResult::Close;
220        }
221
222        while counter < MAX_LOOP_ITERATIONS {
223            let frontend_interest = self.frontend_readiness.filter_interest();
224
225            trace!(
226                "PROXY\t{} {:?} {:?} -> None",
227                self.log_context(),
228                self.frontend_token,
229                self.frontend_readiness
230            );
231
232            if frontend_interest.is_empty() {
233                break;
234            }
235
236            if frontend_interest.is_readable() {
237                let session_result = self.readable(metrics);
238                if session_result != SessionResult::Continue {
239                    return session_result;
240                }
241            }
242
243            if frontend_interest.is_error() {
244                error!(
245                    "PROXY session {:?} front error, disconnecting",
246                    self.frontend_token
247                );
248                self.frontend_readiness.interest = Ready::EMPTY;
249
250                return SessionResult::Close;
251            }
252
253            counter += 1;
254        }
255
256        if counter >= MAX_LOOP_ITERATIONS {
257            error!(
258                "PROXY\thandling session {:?} went through {} iterations, there's a probable infinite loop bug, closing the connection",
259                self.frontend_token, MAX_LOOP_ITERATIONS
260            );
261            incr!("http.infinite_loop.error");
262
263            self.print_state("");
264
265            return SessionResult::Close;
266        }
267
268        SessionResult::Continue
269    }
270
271    fn update_readiness(&mut self, token: Token, events: Ready) {
272        if self.frontend_token == token {
273            self.frontend_readiness.event |= events;
274        }
275    }
276
277    fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
278        if self.frontend_token == token {
279            self.container_frontend_timeout.triggered();
280            return StateResult::CloseSession;
281        }
282
283        error!(
284            "Expect state: got timeout for an invalid token: {:?}",
285            token
286        );
287        StateResult::CloseSession
288    }
289
290    fn cancel_timeouts(&mut self) {
291        self.container_frontend_timeout.cancel();
292    }
293
294    fn print_state(&self, context: &str) {
295        error!(
296            "{} Session(Expect)\n\tFrontend:\n\t\ttoken: {:?}\treadiness: {:?}",
297            context, self.frontend_token, self.frontend_readiness
298        );
299    }
300}
301
302#[cfg(test)]
303mod expect_test {
304    use super::*;
305    use mio::net::TcpListener;
306    use rusty_ulid::Ulid;
307    use std::{
308        io::Write,
309        net::TcpStream as StdTcpStream,
310        net::{IpAddr, Ipv4Addr, SocketAddr},
311        sync::{Arc, Barrier},
312        thread::{self, JoinHandle},
313        time::Duration,
314    };
315
316    use crate::protocol::proxy_protocol::header::*;
317
318    // Flow diagram of the test below
319    //                [connect]   [send proxy protocol]
320    //upfront proxy  ----------------------X
321    //              /     |           |
322    //  sozu     ---------v-----------v----X
323    #[test]
324    fn middleware_should_receive_proxy_protocol_header_from_an_upfront_middleware() {
325        setup_test_logger!();
326        let middleware_addr: SocketAddr = "127.0.0.1:3500".parse().expect("parse address error");
327        let barrier = Arc::new(Barrier::new(2));
328
329        let upfront = start_upfront_middleware(middleware_addr, barrier.clone());
330        start_middleware(middleware_addr, barrier);
331
332        upfront.join().expect("should join");
333    }
334
335    // Accept connection from an upfront proxy and expect to read a proxy protocol header in this stream.
336    fn start_middleware(middleware_addr: SocketAddr, barrier: Arc<Barrier>) {
337        let upfront_middleware_conn_listener = TcpListener::bind(middleware_addr)
338            .expect("could not accept upfront middleware connection");
339        let session_stream;
340        barrier.wait();
341
342        // mio::TcpListener use a nonblocking mode so we have to loop on accept
343        loop {
344            if let Ok((stream, _addr)) = upfront_middleware_conn_listener.accept() {
345                session_stream = stream;
346                break;
347            }
348        }
349
350        let mut session_metrics = SessionMetrics::new(None);
351        let container_frontend_timeout = TimeoutContainer::new(Duration::from_secs(10), Token(0));
352        let mut expect_pp = ExpectProxyProtocol::new(
353            container_frontend_timeout,
354            session_stream,
355            Token(0),
356            Ulid::generate(),
357        );
358
359        let mut res = SessionResult::Continue;
360        while res == SessionResult::Continue {
361            res = expect_pp.readable(&mut session_metrics);
362        }
363
364        if res != SessionResult::Upgrade {
365            panic!("Should receive a complete proxy protocol header, res = {res:?}");
366        };
367    }
368
369    // Connect to the next middleware and send a proxy protocol header
370    fn start_upfront_middleware(
371        next_middleware_addr: SocketAddr,
372        barrier: Arc<Barrier>,
373    ) -> JoinHandle<()> {
374        thread::spawn(move || {
375            let src_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(125, 25, 10, 1)), 8080);
376            let dst_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 4, 5, 8)), 4200);
377            let proxy_protocol = HeaderV2::new(Command::Local, src_addr, dst_addr).into_bytes();
378
379            barrier.wait();
380            match StdTcpStream::connect(next_middleware_addr) {
381                Ok(mut stream) => {
382                    stream.write(&proxy_protocol).unwrap();
383                }
384                Err(e) => panic!("could not connect to the next middleware: {e}"),
385            };
386        })
387    }
388}