Skip to main content

sozu_lib/protocol/proxy_protocol/
relay.rs

1//! PROXY-v2 relay state.
2//!
3//! Reads an inbound PROXY-v2 header (`parse_v2_header`) and forwards the
4//! captured bytes verbatim onto a freshly opened backend `TcpStream` before
5//! the rest of the byte stream begins. Used when Sōzu sits between two
6//! PROXY-aware peers and must preserve the original client identity.
7
8use std::{cell::RefCell, io::Write, rc::Rc};
9
10use mio::{Token, net::TcpStream};
11use nom::{Err, Offset};
12use rusty_ulid::Ulid;
13use sozu_command::logging::ansi_palette;
14
15use crate::metrics::names;
16use crate::{
17    Protocol, Readiness, SessionMetrics, SessionResult,
18    pool::Checkout,
19    protocol::{
20        pipe::{Pipe, WebSocketContext},
21        proxy_protocol::{header::ProxyAddr, parser::parse_v2_header},
22    },
23    socket::{SocketHandler, SocketResult},
24    sozu_command::ready::Ready,
25    tcp::TcpListener,
26};
27
28/// Module-level prefix used on every log line emitted from this module when
29/// no per-session state is in scope. Produces a bold bright-white
30/// `PROXY-RELAY` label (uniform across every protocol) when the logger is in
31/// colored mode.
32#[allow(unused_macros)]
33macro_rules! log_module_context {
34    () => {{
35        let (open, reset, _, _, _) = ansi_palette();
36        format!("{open}PROXY-RELAY{reset}\t >>>", open = open, reset = reset)
37    }};
38}
39
40/// Per-session prefix for log lines emitted with a [`RelayProxyProtocol`] in
41/// scope. Renders the canonical
42/// `\tPROXY-RELAY\tSession(...)\t >>>` envelope. The relay state has no
43/// `request_id`-keyed [`LogContext`] (the caller-side ulid is not yet bound
44/// to a request); the bracket carries the front/back tokens instead.
45macro_rules! log_context {
46    ($self:expr) => {{
47        let (open, reset, grey, gray, white) = ansi_palette();
48        format!(
49            "{open}PROXY-RELAY{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}front_readiness{reset}={white}{front_readiness}{reset}, {gray}back_readiness{reset}={white}{back_readiness}{reset})\t >>>",
50            open = open,
51            reset = reset,
52            grey = grey,
53            gray = gray,
54            white = white,
55            frontend = $self.frontend_token.0,
56            backend = $self.backend_token.map(|t| t.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
57            front_readiness = $self.frontend_readiness,
58            back_readiness = $self.backend_readiness,
59        )
60    }};
61}
62
63pub struct RelayProxyProtocol<Front: SocketHandler> {
64    cursor_header: usize,
65    pub backend_readiness: Readiness,
66    pub backend_token: Option<Token>,
67    pub backend: Option<TcpStream>,
68    pub frontend_buffer: Checkout,
69    pub frontend_readiness: Readiness,
70    pub frontend_token: Token,
71    pub frontend: Front,
72    pub header_size: Option<usize>,
73    pub request_id: Ulid,
74    /// Parsed PROXY-v2 address pair captured from the inbound header.
75    /// `None` until the parser succeeds, or for headers that carry
76    /// `Command::Local` (no encapsulated addresses). The pipe phase
77    /// uses `ProxyAddr::source()` here to attribute the real client
78    /// instead of the upstream PROXY-emitter's `peer_addr`.
79    pub addresses: Option<ProxyAddr>,
80}
81
82impl<Front: SocketHandler> RelayProxyProtocol<Front> {
83    /// Instantiate a new RelayProxyProtocol SessionState with:
84    /// - frontend_interest: READABLE | HUP | ERROR
85    /// - frontend_event: EMPTY
86    pub fn new(
87        frontend: Front,
88        frontend_token: Token,
89        request_id: Ulid,
90        backend: Option<TcpStream>,
91        front_buf: Checkout,
92    ) -> Self {
93        RelayProxyProtocol {
94            backend_readiness: Readiness {
95                interest: Ready::HUP | Ready::ERROR,
96                event: Ready::EMPTY,
97            },
98            backend_token: None,
99            backend,
100            cursor_header: 0,
101            frontend_buffer: front_buf,
102            frontend_readiness: Readiness {
103                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
104                event: Ready::EMPTY,
105            },
106            frontend_token,
107            frontend,
108            header_size: None,
109            request_id,
110            addresses: None,
111        }
112    }
113
114    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
115        let space_before = self.frontend_buffer.available_space();
116        let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
117        debug!("{} read {} bytes and res={:?}", log_context!(self), sz, res);
118        // The socket can only write into the free space it was handed.
119        debug_assert!(
120            sz <= space_before,
121            "socket_read cannot return more bytes than the available space"
122        );
123
124        if sz > 0 {
125            let data_before = self.frontend_buffer.available_data();
126            self.frontend_buffer.fill(sz);
127            // `fill` admits the freshly read bytes into the readable window:
128            // available data grows by exactly `sz` (the read fit, asserted
129            // above, so `fill` does not clamp).
130            debug_assert_eq!(
131                self.frontend_buffer.available_data(),
132                data_before + sz,
133                "fill must expose exactly the bytes just read"
134            );
135
136            count!(names::backend::BYTES_IN, sz as i64);
137            metrics.bin += sz;
138
139            if res == SocketResult::Error {
140                error!(
141                    "{} front socket error, closing the connection",
142                    log_context!(self)
143                );
144                incr!(names::proxy_protocol::ERRORS);
145                self.frontend_readiness.reset();
146                self.backend_readiness.reset();
147                return SessionResult::Close;
148            }
149
150            if res == SocketResult::WouldBlock {
151                self.frontend_readiness.event.remove(Ready::READABLE);
152            }
153
154            let data_len = self.frontend_buffer.available_data();
155            let read_sz = match parse_v2_header(self.frontend_buffer.data()) {
156                Ok((rest, header)) => {
157                    self.frontend_readiness.interest.remove(Ready::READABLE);
158                    self.backend_readiness.interest.insert(Ready::WRITABLE);
159                    // Capture the parsed addresses so the pipe phase can
160                    // attribute traffic to the real client (see
161                    // `into_pipe`). The header bytes themselves are still
162                    // forwarded verbatim onto the backend by
163                    // `back_writable`.
164                    self.addresses = Some(header.addr);
165                    let consumed = self.frontend_buffer.data().offset(rest);
166                    // The header length is the prefix the parser consumed; it
167                    // is a real (non-empty) prefix of the buffered data and can
168                    // never exceed what was buffered.
169                    debug_assert!(
170                        consumed <= data_len,
171                        "parsed header length cannot exceed the buffered bytes"
172                    );
173                    debug_assert!(consumed > 0, "a recognized v2 header is non-empty");
174                    consumed
175                }
176                Err(Err::Incomplete(_)) => return SessionResult::Continue,
177                Err(e) => {
178                    error!(
179                        "{} error parsing the proxy protocol header (error={:?}), closing the connection",
180                        log_context!(self),
181                        e
182                    );
183                    return SessionResult::Close;
184                }
185            };
186
187            self.header_size = Some(read_sz);
188            self.frontend_buffer.consume(sz);
189            return SessionResult::Continue;
190        }
191
192        SessionResult::Continue
193    }
194
195    // The header is send immediately at once upon the connection is establish
196    // and prepended before any data.
197    pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
198        debug!("{} writing proxy protocol header", log_context!(self));
199
200        if let Some(ref mut socket) = self.backend {
201            if let Some(ref header_size) = self.header_size {
202                loop {
203                    let available_before = self.frontend_buffer.available_data();
204                    match socket.write(self.frontend_buffer.data()) {
205                        Ok(sz) => {
206                            // A socket write reports at most the bytes it was
207                            // offered, so the forwarded count never exceeds the
208                            // buffered header tail.
209                            debug_assert!(
210                                sz <= available_before,
211                                "socket.write cannot send more than the buffered bytes"
212                            );
213                            let cursor_before = self.cursor_header;
214                            self.cursor_header += sz;
215                            // The forwarding cursor is strictly monotonic and
216                            // tracks exactly the bytes emitted this write.
217                            debug_assert_eq!(
218                                self.cursor_header,
219                                cursor_before + sz,
220                                "header cursor advances by exactly the bytes written"
221                            );
222
223                            count!(names::backend::BACK_BYTES_OUT, sz as i64);
224                            metrics.backend_bout += sz;
225                            self.frontend_buffer.consume(sz);
226
227                            if self.cursor_header >= *header_size {
228                                info!("{} proxy protocol sent, upgrading", log_context!(self));
229                                return SessionResult::Upgrade;
230                            }
231                        }
232                        Err(e) => {
233                            incr!(names::proxy_protocol::ERRORS);
234                            self.frontend_readiness.reset();
235                            self.backend_readiness.reset();
236                            debug!("{} write error: {}", log_context!(self), e);
237                            break;
238                        }
239                    }
240                }
241            }
242        }
243        SessionResult::Continue
244    }
245
246    pub fn front_socket(&self) -> &TcpStream {
247        self.frontend.socket_ref()
248    }
249
250    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
251        self.frontend.socket_mut()
252    }
253
254    pub fn back_socket(&self) -> Option<&TcpStream> {
255        self.backend.as_ref()
256    }
257
258    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
259        self.backend.as_mut()
260    }
261
262    pub fn set_back_socket(&mut self, socket: TcpStream) {
263        self.backend = Some(socket);
264    }
265
266    pub fn back_token(&self) -> Option<Token> {
267        self.backend_token
268    }
269
270    pub fn set_back_token(&mut self, token: Token) {
271        self.backend_token = Some(token);
272    }
273
274    pub fn into_pipe(
275        mut self,
276        back_buf: Checkout,
277        listener: Rc<RefCell<TcpListener>>,
278    ) -> Pipe<Front, TcpListener> {
279        let backend_socket = self.backend.take().unwrap();
280        // Same rationale as `ExpectProxyProtocol::into_pipe`: prefer the
281        // PROXY-v2 source over the TCP `peer_addr`. In Relay mode the
282        // upstream emitter is also the TCP peer, so without this fix
283        // the pipe phase records the LB / edge proxy instead of the
284        // real client. Falls back when the header was `Command::Local`
285        // (no addresses) or when the parser ran with `AddressFamily::Unspec`.
286        let addr = self
287            .addresses
288            .as_ref()
289            .and_then(|pa| pa.source())
290            .or_else(|| self.front_socket().peer_addr().ok());
291
292        let mut pipe = Pipe::new(
293            back_buf,
294            None,
295            Some(backend_socket),
296            None,
297            None,
298            None,
299            None,
300            self.frontend_buffer,
301            self.frontend_token,
302            self.frontend,
303            listener,
304            Protocol::TCP,
305            self.request_id,
306            self.request_id,
307            addr,
308            WebSocketContext::Tcp,
309        );
310
311        pipe.frontend_readiness.event = self.frontend_readiness.event;
312        pipe.backend_readiness.event = self.backend_readiness.event;
313
314        if let Some(back_token) = self.backend_token {
315            pipe.set_back_token(back_token);
316        }
317
318        pipe
319    }
320}