sozu_lib/protocol/proxy_protocol/
expect.rs

1use std::{cell::RefCell, rc::Rc};
2
3use mio::{net::TcpStream, *};
4use nom::{Err, HexDisplay};
5use rusty_ulid::Ulid;
6use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext};
7
8use super::{header::ProxyAddr, parser::parse_v2_header};
9use crate::{
10    Protocol, Readiness, SessionMetrics, StateResult,
11    pool::Checkout,
12    protocol::{
13        SessionResult, SessionState,
14        pipe::{Pipe, WebSocketContext},
15    },
16    socket::{SocketHandler, SocketResult},
17    sozu_command::ready::Ready,
18    tcp::TcpListener,
19    timer::TimeoutContainer,
20};
21
22#[derive(Clone, Copy)]
23pub enum HeaderLen {
24    V4,
25    V6,
26    Unix,
27}
28
29// TODO: should have a backend
30pub struct ExpectProxyProtocol<Front: SocketHandler> {
31    pub addresses: Option<ProxyAddr>,
32    pub container_frontend_timeout: TimeoutContainer,
33    frontend_buffer: [u8; 232],
34    pub frontend_readiness: Readiness,
35    pub frontend_token: Token,
36    pub frontend: Front,
37    header_len: HeaderLen,
38    index: usize,
39    pub request_id: Ulid,
40}
41
42impl<Front: SocketHandler> ExpectProxyProtocol<Front> {
43    /// Instantiate a new ExpectProxyProtocol SessionState with:
44    /// - frontend_interest: READABLE | HUP | ERROR
45    /// - frontend_event: EMPTY
46    pub fn new(
47        container_frontend_timeout: TimeoutContainer,
48        frontend: Front,
49        frontend_token: Token,
50        request_id: Ulid,
51    ) -> Self {
52        ExpectProxyProtocol {
53            addresses: None,
54            container_frontend_timeout,
55            frontend_buffer: [0; 232],
56            frontend_readiness: Readiness {
57                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
58                event: Ready::EMPTY,
59            },
60            frontend_token,
61            frontend,
62            header_len: HeaderLen::V4,
63            index: 0,
64            request_id,
65        }
66    }
67
68    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
69        let total_len = match self.header_len {
70            HeaderLen::V4 => 28,
71            HeaderLen::V6 => 52,
72            HeaderLen::Unix => 232,
73        };
74
75        let (sz, socket_result) = self
76            .frontend
77            .socket_read(&mut self.frontend_buffer[self.index..total_len]);
78        trace!(
79            "FRONT proxy protocol [{:?}]: read {} bytes and res={:?}, index = {}, total_len = {}",
80            self.frontend_token, sz, socket_result, self.index, total_len
81        );
82
83        if sz > 0 {
84            self.index += sz;
85
86            count!("bytes_in", sz as i64);
87            metrics.bin += sz;
88
89            if self.index == self.frontend_buffer.len() {
90                self.frontend_readiness.interest.remove(Ready::READABLE);
91            }
92        } else {
93            self.frontend_readiness.event.remove(Ready::READABLE);
94        }
95
96        match socket_result {
97            SocketResult::Error => {
98                error!(
99                    "[{:?}] (expect proxy) front socket error, closing the connection(read {}, wrote {})",
100                    self.frontend_token, metrics.bin, metrics.bout
101                );
102                incr!("proxy_protocol.errors");
103                self.frontend_readiness.reset();
104                return SessionResult::Close;
105            }
106            SocketResult::WouldBlock => {
107                self.frontend_readiness.event.remove(Ready::READABLE);
108            }
109            SocketResult::Closed | SocketResult::Continue => {}
110        }
111
112        match parse_v2_header(&self.frontend_buffer[..self.index]) {
113            Ok((rest, header)) => {
114                trace!(
115                    "got expect header: {:?}, rest.len() = {}",
116                    header,
117                    rest.len()
118                );
119                self.addresses = Some(header.addr);
120                SessionResult::Upgrade
121            }
122            Err(Err::Incomplete(_)) => {
123                match self.header_len {
124                    HeaderLen::V4 => {
125                        if self.index == 28 {
126                            self.header_len = HeaderLen::V6;
127                        }
128                    }
129                    HeaderLen::V6 => {
130                        if self.index == 52 {
131                            self.header_len = HeaderLen::Unix;
132                        }
133                    }
134                    HeaderLen::Unix => {
135                        if self.index == 232 {
136                            error!(
137                                "[{:?}] front socket parse error, closing the connection",
138                                self.frontend_token
139                            );
140                            incr!("proxy_protocol.errors");
141                            self.frontend_readiness.reset();
142                            return SessionResult::Continue;
143                        }
144                    }
145                };
146                SessionResult::Continue
147            }
148            Err(Err::Error(e)) | Err(Err::Failure(e)) => {
149                error!(
150                    "[{:?}] expect proxy protocol front socket parse error, closing the connection:\n{}",
151                    self.frontend_token,
152                    e.input.to_hex(16)
153                );
154                incr!("proxy_protocol.errors");
155                self.frontend_readiness.reset();
156                SessionResult::Close
157            }
158        }
159    }
160
161    pub fn front_socket(&self) -> &TcpStream {
162        self.frontend.socket_ref()
163    }
164
165    pub fn into_pipe(
166        self,
167        front_buf: Checkout,
168        back_buf: Checkout,
169        backend_socket: Option<TcpStream>,
170        backend_token: Option<Token>,
171        listener: Rc<RefCell<TcpListener>>,
172    ) -> Pipe<Front, TcpListener> {
173        let addr = self.front_socket().peer_addr().ok();
174
175        let mut pipe = Pipe::new(
176            back_buf,
177            None,
178            backend_socket,
179            None,
180            None,
181            Some(self.container_frontend_timeout),
182            None,
183            front_buf,
184            self.frontend_token,
185            self.frontend,
186            listener,
187            Protocol::TCP,
188            self.request_id,
189            addr,
190            WebSocketContext::Tcp,
191        );
192
193        pipe.frontend_readiness.event = self.frontend_readiness.event;
194
195        if let Some(backend_token) = backend_token {
196            pipe.set_back_token(backend_token);
197        }
198
199        pipe
200    }
201
202    pub fn log_context(&self) -> LogContext<'_> {
203        LogContext {
204            request_id: self.request_id,
205            cluster_id: None,
206            backend_id: None,
207        }
208    }
209}
210
211impl<Front: SocketHandler> SessionState for ExpectProxyProtocol<Front> {
212    fn ready(
213        &mut self,
214        _session: Rc<RefCell<dyn crate::ProxySession>>,
215        _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
216        metrics: &mut SessionMetrics,
217    ) -> SessionResult {
218        let mut counter = 0;
219
220        if self.frontend_readiness.event.is_hup() {
221            return SessionResult::Close;
222        }
223
224        while counter < MAX_LOOP_ITERATIONS {
225            let frontend_interest = self.frontend_readiness.filter_interest();
226
227            trace!(
228                "PROXY\t{} {:?} {:?} -> None",
229                self.log_context(),
230                self.frontend_token,
231                self.frontend_readiness
232            );
233
234            if frontend_interest.is_empty() {
235                break;
236            }
237
238            if frontend_interest.is_readable() {
239                let session_result = self.readable(metrics);
240                if session_result != SessionResult::Continue {
241                    return session_result;
242                }
243            }
244
245            if frontend_interest.is_error() {
246                error!(
247                    "PROXY session {:?} front error, disconnecting",
248                    self.frontend_token
249                );
250                self.frontend_readiness.interest = Ready::EMPTY;
251
252                return SessionResult::Close;
253            }
254
255            counter += 1;
256        }
257
258        if counter >= MAX_LOOP_ITERATIONS {
259            error!(
260                "PROXY\thandling session {:?} went through {} iterations, there's a probable infinite loop bug, closing the connection",
261                self.frontend_token, MAX_LOOP_ITERATIONS
262            );
263            incr!("http.infinite_loop.error");
264
265            self.print_state("");
266
267            return SessionResult::Close;
268        }
269
270        SessionResult::Continue
271    }
272
273    fn update_readiness(&mut self, token: Token, events: Ready) {
274        if self.frontend_token == token {
275            self.frontend_readiness.event |= events;
276        }
277    }
278
279    fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
280        if self.frontend_token == token {
281            self.container_frontend_timeout.triggered();
282            return StateResult::CloseSession;
283        }
284
285        error!(
286            "Expect state: got timeout for an invalid token: {:?}",
287            token
288        );
289        StateResult::CloseSession
290    }
291
292    fn cancel_timeouts(&mut self) {
293        self.container_frontend_timeout.cancel();
294    }
295
296    fn print_state(&self, context: &str) {
297        error!(
298            "{} Session(Expect)\n\tFrontend:\n\t\ttoken: {:?}\treadiness: {:?}",
299            context, self.frontend_token, self.frontend_readiness
300        );
301    }
302}
303
304#[cfg(test)]
305mod expect_test {
306    use std::{
307        io::Write,
308        net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream as StdTcpStream},
309        sync::{Arc, Barrier},
310        thread::{self, JoinHandle},
311        time::Duration,
312    };
313
314    use mio::net::TcpListener;
315    use rusty_ulid::Ulid;
316
317    use super::*;
318    use crate::protocol::proxy_protocol::header::*;
319
320    // Flow diagram of the test below
321    //                [connect]   [send proxy protocol]
322    //upfront proxy  ----------------------X
323    //              /     |           |
324    //  sozu     ---------v-----------v----X
325    #[test]
326    fn middleware_should_receive_proxy_protocol_header_from_an_upfront_middleware() {
327        setup_test_logger!();
328        let middleware_addr: SocketAddr = "127.0.0.1:3500".parse().expect("parse address error");
329        let barrier = Arc::new(Barrier::new(2));
330
331        let upfront = start_upfront_middleware(middleware_addr, barrier.clone());
332        start_middleware(middleware_addr, barrier);
333
334        upfront.join().expect("should join");
335    }
336
337    // Accept connection from an upfront proxy and expect to read a proxy protocol header in this stream.
338    fn start_middleware(middleware_addr: SocketAddr, barrier: Arc<Barrier>) {
339        let upfront_middleware_conn_listener = TcpListener::bind(middleware_addr)
340            .expect("could not accept upfront middleware connection");
341        let session_stream;
342        barrier.wait();
343
344        // mio::TcpListener use a nonblocking mode so we have to loop on accept
345        loop {
346            if let Ok((stream, _addr)) = upfront_middleware_conn_listener.accept() {
347                session_stream = stream;
348                break;
349            }
350        }
351
352        let mut session_metrics = SessionMetrics::new(None);
353        let container_frontend_timeout = TimeoutContainer::new(Duration::from_secs(10), Token(0));
354        let mut expect_pp = ExpectProxyProtocol::new(
355            container_frontend_timeout,
356            session_stream,
357            Token(0),
358            Ulid::generate(),
359        );
360
361        let mut res = SessionResult::Continue;
362        while res == SessionResult::Continue {
363            res = expect_pp.readable(&mut session_metrics);
364        }
365
366        if res != SessionResult::Upgrade {
367            panic!("Should receive a complete proxy protocol header, res = {res:?}");
368        };
369    }
370
371    // Connect to the next middleware and send a proxy protocol header
372    fn start_upfront_middleware(
373        next_middleware_addr: SocketAddr,
374        barrier: Arc<Barrier>,
375    ) -> JoinHandle<()> {
376        thread::spawn(move || {
377            let src_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(125, 25, 10, 1)), 8080);
378            let dst_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 4, 5, 8)), 4200);
379            let proxy_protocol = HeaderV2::new(Command::Local, src_addr, dst_addr).into_bytes();
380
381            barrier.wait();
382            match StdTcpStream::connect(next_middleware_addr) {
383                Ok(mut stream) => {
384                    stream.write(&proxy_protocol).unwrap();
385                }
386                Err(e) => panic!("could not connect to the next middleware: {e}"),
387            };
388        })
389    }
390}