Skip to main content

sozu_lib/protocol/proxy_protocol/
expect.rs

1//! Inbound PROXY-v2 expectation state.
2//!
3//! Reads bytes from the freshly accepted front-end socket until a complete
4//! PROXY v2 header has been parsed (`parse_v2_header`), captures the peer
5//! address pair, and transitions the session to the configured downstream
6//! protocol (typically `Pipe` for TCP listeners). Bounded by
7//! `MAX_LOOP_ITERATIONS` to defend against malformed/empty headers.
8
9use std::{cell::RefCell, rc::Rc};
10
11use mio::{net::TcpStream, *};
12use nom::{Err, HexDisplay};
13use rusty_ulid::Ulid;
14use sozu_command::{
15    config::MAX_LOOP_ITERATIONS,
16    logging::{LogContext, ansi_palette},
17};
18
19use super::{header::ProxyAddr, parser::parse_v2_header};
20use crate::metrics::names;
21use crate::{
22    Protocol, Readiness, SessionMetrics, StateResult,
23    pool::Checkout,
24    protocol::{
25        SessionResult, SessionState,
26        pipe::{Pipe, WebSocketContext},
27    },
28    socket::{SocketHandler, SocketResult},
29    sozu_command::ready::Ready,
30    tcp::TcpListener,
31    timer::TimeoutContainer,
32};
33
34/// Module-level prefix used on every log line emitted from this module when
35/// no per-session state is in scope. Produces a bold bright-white
36/// `PROXY-EXPECT` label (uniform across every protocol) when the logger is in
37/// colored mode.
38macro_rules! log_module_context {
39    () => {{
40        let (open, reset, _, _, _) = ansi_palette();
41        format!(
42            "{open}PROXY-EXPECT{reset}\t >>>",
43            open = open,
44            reset = reset
45        )
46    }};
47}
48
49/// Per-session prefix for log lines emitted with an
50/// [`ExpectProxyProtocol`] in scope. Renders the canonical
51/// `[ulid - - -]\tPROXY-EXPECT\tSession(...)\t >>>` envelope so operators can
52/// grep these lines alongside `MUX-*`, `RUSTLS`, and `PIPE` traffic for the
53/// same session.
54macro_rules! log_context {
55    ($self:expr) => {{
56        let (open, reset, grey, gray, white) = ansi_palette();
57        format!(
58            "{gray}{ctx}{reset}\t{open}PROXY-EXPECT{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}index{reset}={white}{index}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
59            open = open,
60            reset = reset,
61            grey = grey,
62            gray = gray,
63            white = white,
64            ctx = $self.log_context(),
65            frontend = $self.frontend_token.0,
66            index = $self.index,
67            readiness = $self.frontend_readiness,
68        )
69    }};
70}
71
72#[derive(Clone, Copy)]
73pub enum HeaderLen {
74    V4,
75    V6,
76    Unix,
77}
78
79// TODO: should have a backend
80pub struct ExpectProxyProtocol<Front: SocketHandler> {
81    pub addresses: Option<ProxyAddr>,
82    pub container_frontend_timeout: TimeoutContainer,
83    frontend_buffer: [u8; 232],
84    pub frontend_readiness: Readiness,
85    pub frontend_token: Token,
86    pub frontend: Front,
87    header_len: HeaderLen,
88    index: usize,
89    pub request_id: Ulid,
90}
91
92impl<Front: SocketHandler> ExpectProxyProtocol<Front> {
93    /// Instantiate a new ExpectProxyProtocol SessionState with:
94    /// - frontend_interest: READABLE | HUP | ERROR
95    /// - frontend_event: EMPTY
96    pub fn new(
97        container_frontend_timeout: TimeoutContainer,
98        frontend: Front,
99        frontend_token: Token,
100        request_id: Ulid,
101    ) -> Self {
102        ExpectProxyProtocol {
103            addresses: None,
104            container_frontend_timeout,
105            frontend_buffer: [0; 232],
106            frontend_readiness: Readiness {
107                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
108                event: Ready::EMPTY,
109            },
110            frontend_token,
111            frontend,
112            header_len: HeaderLen::V4,
113            index: 0,
114            request_id,
115        }
116    }
117
118    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
119        let total_len = match self.header_len {
120            HeaderLen::V4 => 28,
121            HeaderLen::V6 => 52,
122            HeaderLen::Unix => 232,
123        };
124
125        let (sz, socket_result) = self
126            .frontend
127            .socket_read(&mut self.frontend_buffer[self.index..total_len]);
128        trace!(
129            "{} read {} bytes and res={:?}, total_len = {}",
130            log_context!(self),
131            sz,
132            socket_result,
133            total_len
134        );
135
136        if sz > 0 {
137            self.index += sz;
138
139            count!(names::backend::BYTES_IN, sz as i64);
140            metrics.bin += sz;
141
142            if self.index == self.frontend_buffer.len() {
143                self.frontend_readiness.interest.remove(Ready::READABLE);
144            }
145        } else {
146            self.frontend_readiness.event.remove(Ready::READABLE);
147        }
148
149        match socket_result {
150            SocketResult::Error => {
151                error!(
152                    "{} front socket error, closing the connection (read {}, wrote {})",
153                    log_context!(self),
154                    metrics.bin,
155                    metrics.bout
156                );
157                incr!(names::proxy_protocol::ERRORS);
158                self.frontend_readiness.reset();
159                return SessionResult::Close;
160            }
161            SocketResult::WouldBlock => {
162                self.frontend_readiness.event.remove(Ready::READABLE);
163            }
164            SocketResult::Closed => {
165                // Socket closed before any proxy-protocol bytes were received.
166                // This is the typical HAProxy bare TCP healthcheck pattern
167                // (SYN/ACK/FIN without send-proxy). Close immediately instead
168                // of waiting for request_timeout (default 10s), which would
169                // create zombie sessions consuming nb_connections quota.
170                if self.index == 0 {
171                    trace!(
172                        "{} socket closed with 0 bytes, closing session",
173                        log_context!(self)
174                    );
175                    return SessionResult::Close;
176                }
177            }
178            SocketResult::Continue => {}
179        }
180
181        match parse_v2_header(&self.frontend_buffer[..self.index]) {
182            Ok((rest, header)) => {
183                trace!(
184                    "{} got expect header: {:?}, rest.len() = {}",
185                    log_context!(self),
186                    header,
187                    rest.len()
188                );
189                self.addresses = Some(header.addr);
190                SessionResult::Upgrade
191            }
192            Err(Err::Incomplete(_)) => {
193                match self.header_len {
194                    HeaderLen::V4 => {
195                        if self.index == 28 {
196                            self.header_len = HeaderLen::V6;
197                        }
198                    }
199                    HeaderLen::V6 => {
200                        if self.index == 52 {
201                            self.header_len = HeaderLen::Unix;
202                        }
203                    }
204                    HeaderLen::Unix => {
205                        if self.index == 232 {
206                            error!(
207                                "{} proxy protocol header exceeds maximum size (232 bytes), closing",
208                                log_context!(self)
209                            );
210                            incr!(names::proxy_protocol::ERRORS);
211                            self.frontend_readiness.reset();
212                            return SessionResult::Close;
213                        }
214                    }
215                };
216                SessionResult::Continue
217            }
218            Err(Err::Error(e)) | Err(Err::Failure(e)) => {
219                error!(
220                    "{} parse error, closing the connection:\n{}",
221                    log_context!(self),
222                    e.input.to_hex(16)
223                );
224                incr!(names::proxy_protocol::ERRORS);
225                self.frontend_readiness.reset();
226                SessionResult::Close
227            }
228        }
229    }
230
231    pub fn front_socket(&self) -> &TcpStream {
232        self.frontend.socket_ref()
233    }
234
235    pub fn into_pipe(
236        self,
237        front_buf: Checkout,
238        back_buf: Checkout,
239        backend_socket: Option<TcpStream>,
240        backend_token: Option<Token>,
241        listener: Rc<RefCell<TcpListener>>,
242    ) -> Pipe<Front, TcpListener> {
243        // Prefer the source address parsed from the PROXY-v2 header over
244        // the TCP `peer_addr` so the pipe phase records the real client
245        // — `peer_addr` here is the upstream PROXY-emitter (an LB / edge
246        // proxy / health-check probe), not the originating client.
247        // Falls back to `peer_addr` when the header carried `Command::Local`
248        // (no encapsulated addresses) or when the parser ran with
249        // `AddressFamily::Unspec`.
250        let addr = self
251            .addresses
252            .as_ref()
253            .and_then(|pa| pa.source())
254            .or_else(|| self.front_socket().peer_addr().ok());
255
256        let mut pipe = Pipe::new(
257            back_buf,
258            None,
259            backend_socket,
260            None,
261            None,
262            Some(self.container_frontend_timeout),
263            None,
264            front_buf,
265            self.frontend_token,
266            self.frontend,
267            listener,
268            Protocol::TCP,
269            self.request_id,
270            self.request_id,
271            addr,
272            WebSocketContext::Tcp,
273        );
274
275        pipe.frontend_readiness.event = self.frontend_readiness.event;
276
277        if let Some(backend_token) = backend_token {
278            pipe.set_back_token(backend_token);
279        }
280
281        pipe
282    }
283
284    pub fn log_context(&self) -> LogContext<'_> {
285        LogContext {
286            session_id: self.request_id,
287            request_id: None,
288            cluster_id: None,
289            backend_id: None,
290        }
291    }
292}
293
294impl<Front: SocketHandler> SessionState for ExpectProxyProtocol<Front> {
295    fn ready(
296        &mut self,
297        _session: Rc<RefCell<dyn crate::ProxySession>>,
298        _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
299        metrics: &mut SessionMetrics,
300    ) -> SessionResult {
301        let mut counter = 0;
302
303        if self.frontend_readiness.event.is_hup() {
304            return SessionResult::Close;
305        }
306
307        while counter < MAX_LOOP_ITERATIONS {
308            let frontend_interest = self.frontend_readiness.filter_interest();
309
310            trace!(
311                "{} {:?} -> None",
312                log_context!(self),
313                self.frontend_readiness
314            );
315
316            if frontend_interest.is_empty() {
317                break;
318            }
319
320            if frontend_interest.is_readable() {
321                let session_result = self.readable(metrics);
322                if session_result != SessionResult::Continue {
323                    return session_result;
324                }
325            }
326
327            if frontend_interest.is_error() {
328                error!("{} front error, disconnecting", log_context!(self));
329                self.frontend_readiness.interest = Ready::EMPTY;
330
331                return SessionResult::Close;
332            }
333
334            counter += 1;
335        }
336
337        if counter >= MAX_LOOP_ITERATIONS {
338            error!(
339                "{} handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
340                log_context!(self),
341                MAX_LOOP_ITERATIONS
342            );
343            incr!(names::http::INFINITE_LOOP_ERROR);
344
345            self.print_state("");
346
347            return SessionResult::Close;
348        }
349
350        SessionResult::Continue
351    }
352
353    fn update_readiness(&mut self, token: Token, events: Ready) {
354        if self.frontend_token == token {
355            self.frontend_readiness.event |= events;
356        }
357    }
358
359    fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
360        if self.frontend_token == token {
361            self.container_frontend_timeout.triggered();
362            return StateResult::CloseSession;
363        }
364
365        error!(
366            "{} got timeout for an invalid token: {:?}",
367            log_module_context!(),
368            token
369        );
370        StateResult::CloseSession
371    }
372
373    fn cancel_timeouts(&mut self) {
374        self.container_frontend_timeout.cancel();
375    }
376
377    fn print_state(&self, context: &str) {
378        error!(
379            "{} {} Session(Expect)\n\tFrontend:\n\t\ttoken: {:?}\treadiness: {:?}",
380            log_context!(self),
381            context,
382            self.frontend_token,
383            self.frontend_readiness
384        );
385    }
386}
387
388#[cfg(test)]
389mod expect_test {
390    use std::{
391        io::Write,
392        net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream as StdTcpStream},
393        sync::{Arc, Barrier},
394        thread::{self, JoinHandle},
395        time::Duration,
396    };
397
398    use mio::net::TcpListener;
399    use rusty_ulid::Ulid;
400
401    use super::*;
402    use crate::protocol::proxy_protocol::header::*;
403
404    // Flow diagram of the test below
405    //                [connect]   [send proxy protocol]
406    //upfront proxy  ----------------------X
407    //              /     |           |
408    //  sozu     ---------v-----------v----X
409    #[test]
410    fn middleware_should_receive_proxy_protocol_header_from_an_upfront_middleware() {
411        setup_test_logger!();
412        let middleware_addr: SocketAddr = "127.0.0.1:3500".parse().expect("parse address error");
413        let barrier = Arc::new(Barrier::new(2));
414
415        let upfront = start_upfront_middleware(middleware_addr, barrier.clone());
416        start_middleware(middleware_addr, barrier);
417
418        upfront.join().expect("should join");
419    }
420
421    // Accept connection from an upfront proxy and expect to read a proxy protocol header in this stream.
422    fn start_middleware(middleware_addr: SocketAddr, barrier: Arc<Barrier>) {
423        let upfront_middleware_conn_listener = TcpListener::bind(middleware_addr)
424            .expect("could not accept upfront middleware connection");
425        let session_stream;
426        barrier.wait();
427
428        // mio::TcpListener use a nonblocking mode so we have to loop on accept
429        loop {
430            if let Ok((stream, _addr)) = upfront_middleware_conn_listener.accept() {
431                session_stream = stream;
432                break;
433            }
434        }
435
436        let mut session_metrics = SessionMetrics::new(None);
437        let container_frontend_timeout = TimeoutContainer::new(Duration::from_secs(10), Token(0));
438        let mut expect_pp = ExpectProxyProtocol::new(
439            container_frontend_timeout,
440            session_stream,
441            Token(0),
442            Ulid::generate(),
443        );
444
445        let mut res = SessionResult::Continue;
446        while res == SessionResult::Continue {
447            res = expect_pp.readable(&mut session_metrics);
448        }
449
450        if res != SessionResult::Upgrade {
451            panic!("Should receive a complete proxy protocol header, res = {res:?}");
452        };
453    }
454
455    // Connect to the next middleware and send a proxy protocol header
456    fn start_upfront_middleware(
457        next_middleware_addr: SocketAddr,
458        barrier: Arc<Barrier>,
459    ) -> JoinHandle<()> {
460        thread::spawn(move || {
461            let src_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(125, 25, 10, 1)), 8080);
462            let dst_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 4, 5, 8)), 4200);
463            let proxy_protocol = HeaderV2::new(Command::Local, src_addr, dst_addr).into_bytes();
464
465            barrier.wait();
466            match StdTcpStream::connect(next_middleware_addr) {
467                Ok(mut stream) => {
468                    stream.write_all(&proxy_protocol).unwrap();
469                }
470                Err(e) => panic!("could not connect to the next middleware: {e}"),
471            };
472        })
473    }
474}