Skip to main content

sozu_lib/protocol/proxy_protocol/
relay.rs

1//! PROXY-v2 relay state.
2//!
3//! Reads an inbound PROXY-v2 header (`parse_v2_header`) and forwards the
4//! captured bytes verbatim onto a freshly opened backend `TcpStream` before
5//! the rest of the byte stream begins. Used when Sōzu sits between two
6//! PROXY-aware peers and must preserve the original client identity.
7
8use std::{cell::RefCell, io::Write, rc::Rc};
9
10use mio::{Token, net::TcpStream};
11use nom::{Err, Offset};
12use rusty_ulid::Ulid;
13use sozu_command::logging::ansi_palette;
14
15use crate::metrics::names;
16use crate::{
17    Protocol, Readiness, SessionMetrics, SessionResult,
18    pool::Checkout,
19    protocol::{
20        pipe::{Pipe, WebSocketContext},
21        proxy_protocol::{header::ProxyAddr, parser::parse_v2_header},
22    },
23    socket::{SocketHandler, SocketResult},
24    sozu_command::ready::Ready,
25    tcp::TcpListener,
26};
27
28/// Module-level prefix used on every log line emitted from this module when
29/// no per-session state is in scope. Produces a bold bright-white
30/// `PROXY-RELAY` label (uniform across every protocol) when the logger is in
31/// colored mode.
32#[allow(unused_macros)]
33macro_rules! log_module_context {
34    () => {{
35        let (open, reset, _, _, _) = ansi_palette();
36        format!("{open}PROXY-RELAY{reset}\t >>>", open = open, reset = reset)
37    }};
38}
39
40/// Per-session prefix for log lines emitted with a [`RelayProxyProtocol`] in
41/// scope. Renders the canonical
42/// `\tPROXY-RELAY\tSession(...)\t >>>` envelope. The relay state has no
43/// `request_id`-keyed [`LogContext`] (the caller-side ulid is not yet bound
44/// to a request); the bracket carries the front/back tokens instead.
45macro_rules! log_context {
46    ($self:expr) => {{
47        let (open, reset, grey, gray, white) = ansi_palette();
48        format!(
49            "{open}PROXY-RELAY{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}front_readiness{reset}={white}{front_readiness}{reset}, {gray}back_readiness{reset}={white}{back_readiness}{reset})\t >>>",
50            open = open,
51            reset = reset,
52            grey = grey,
53            gray = gray,
54            white = white,
55            frontend = $self.frontend_token.0,
56            backend = $self.backend_token.map(|t| t.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
57            front_readiness = $self.frontend_readiness,
58            back_readiness = $self.backend_readiness,
59        )
60    }};
61}
62
63pub struct RelayProxyProtocol<Front: SocketHandler> {
64    cursor_header: usize,
65    pub backend_readiness: Readiness,
66    pub backend_token: Option<Token>,
67    pub backend: Option<TcpStream>,
68    pub frontend_buffer: Checkout,
69    pub frontend_readiness: Readiness,
70    pub frontend_token: Token,
71    pub frontend: Front,
72    pub header_size: Option<usize>,
73    pub request_id: Ulid,
74    /// Parsed PROXY-v2 address pair captured from the inbound header.
75    /// `None` until the parser succeeds, or for headers that carry
76    /// `Command::Local` (no encapsulated addresses). The pipe phase
77    /// uses `ProxyAddr::source()` here to attribute the real client
78    /// instead of the upstream PROXY-emitter's `peer_addr`.
79    pub addresses: Option<ProxyAddr>,
80}
81
82impl<Front: SocketHandler> RelayProxyProtocol<Front> {
83    /// Instantiate a new RelayProxyProtocol SessionState with:
84    /// - frontend_interest: READABLE | HUP | ERROR
85    /// - frontend_event: EMPTY
86    pub fn new(
87        frontend: Front,
88        frontend_token: Token,
89        request_id: Ulid,
90        backend: Option<TcpStream>,
91        front_buf: Checkout,
92    ) -> Self {
93        RelayProxyProtocol {
94            backend_readiness: Readiness {
95                interest: Ready::HUP | Ready::ERROR,
96                event: Ready::EMPTY,
97            },
98            backend_token: None,
99            backend,
100            cursor_header: 0,
101            frontend_buffer: front_buf,
102            frontend_readiness: Readiness {
103                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
104                event: Ready::EMPTY,
105            },
106            frontend_token,
107            frontend,
108            header_size: None,
109            request_id,
110            addresses: None,
111        }
112    }
113
114    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
115        let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
116        debug!("{} read {} bytes and res={:?}", log_context!(self), sz, res);
117
118        if sz > 0 {
119            self.frontend_buffer.fill(sz);
120
121            count!(names::backend::BYTES_IN, sz as i64);
122            metrics.bin += sz;
123
124            if res == SocketResult::Error {
125                error!(
126                    "{} front socket error, closing the connection",
127                    log_context!(self)
128                );
129                incr!(names::proxy_protocol::ERRORS);
130                self.frontend_readiness.reset();
131                self.backend_readiness.reset();
132                return SessionResult::Close;
133            }
134
135            if res == SocketResult::WouldBlock {
136                self.frontend_readiness.event.remove(Ready::READABLE);
137            }
138
139            let read_sz = match parse_v2_header(self.frontend_buffer.data()) {
140                Ok((rest, header)) => {
141                    self.frontend_readiness.interest.remove(Ready::READABLE);
142                    self.backend_readiness.interest.insert(Ready::WRITABLE);
143                    // Capture the parsed addresses so the pipe phase can
144                    // attribute traffic to the real client (see
145                    // `into_pipe`). The header bytes themselves are still
146                    // forwarded verbatim onto the backend by
147                    // `back_writable`.
148                    self.addresses = Some(header.addr);
149                    self.frontend_buffer.data().offset(rest)
150                }
151                Err(Err::Incomplete(_)) => return SessionResult::Continue,
152                Err(e) => {
153                    error!(
154                        "{} error parsing the proxy protocol header (error={:?}), closing the connection",
155                        log_context!(self),
156                        e
157                    );
158                    return SessionResult::Close;
159                }
160            };
161
162            self.header_size = Some(read_sz);
163            self.frontend_buffer.consume(sz);
164            return SessionResult::Continue;
165        }
166
167        SessionResult::Continue
168    }
169
170    // The header is send immediately at once upon the connection is establish
171    // and prepended before any data.
172    pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
173        debug!("{} writing proxy protocol header", log_context!(self));
174
175        if let Some(ref mut socket) = self.backend {
176            if let Some(ref header_size) = self.header_size {
177                loop {
178                    match socket.write(self.frontend_buffer.data()) {
179                        Ok(sz) => {
180                            self.cursor_header += sz;
181
182                            count!(names::backend::BACK_BYTES_OUT, sz as i64);
183                            metrics.backend_bout += sz;
184                            self.frontend_buffer.consume(sz);
185
186                            if self.cursor_header >= *header_size {
187                                info!("{} proxy protocol sent, upgrading", log_context!(self));
188                                return SessionResult::Upgrade;
189                            }
190                        }
191                        Err(e) => {
192                            incr!(names::proxy_protocol::ERRORS);
193                            self.frontend_readiness.reset();
194                            self.backend_readiness.reset();
195                            debug!("{} write error: {}", log_context!(self), e);
196                            break;
197                        }
198                    }
199                }
200            }
201        }
202        SessionResult::Continue
203    }
204
205    pub fn front_socket(&self) -> &TcpStream {
206        self.frontend.socket_ref()
207    }
208
209    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
210        self.frontend.socket_mut()
211    }
212
213    pub fn back_socket(&self) -> Option<&TcpStream> {
214        self.backend.as_ref()
215    }
216
217    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
218        self.backend.as_mut()
219    }
220
221    pub fn set_back_socket(&mut self, socket: TcpStream) {
222        self.backend = Some(socket);
223    }
224
225    pub fn back_token(&self) -> Option<Token> {
226        self.backend_token
227    }
228
229    pub fn set_back_token(&mut self, token: Token) {
230        self.backend_token = Some(token);
231    }
232
233    pub fn into_pipe(
234        mut self,
235        back_buf: Checkout,
236        listener: Rc<RefCell<TcpListener>>,
237    ) -> Pipe<Front, TcpListener> {
238        let backend_socket = self.backend.take().unwrap();
239        // Same rationale as `ExpectProxyProtocol::into_pipe`: prefer the
240        // PROXY-v2 source over the TCP `peer_addr`. In Relay mode the
241        // upstream emitter is also the TCP peer, so without this fix
242        // the pipe phase records the LB / edge proxy instead of the
243        // real client. Falls back when the header was `Command::Local`
244        // (no addresses) or when the parser ran with `AddressFamily::Unspec`.
245        let addr = self
246            .addresses
247            .as_ref()
248            .and_then(|pa| pa.source())
249            .or_else(|| self.front_socket().peer_addr().ok());
250
251        let mut pipe = Pipe::new(
252            back_buf,
253            None,
254            Some(backend_socket),
255            None,
256            None,
257            None,
258            None,
259            self.frontend_buffer,
260            self.frontend_token,
261            self.frontend,
262            listener,
263            Protocol::TCP,
264            self.request_id,
265            self.request_id,
266            addr,
267            WebSocketContext::Tcp,
268        );
269
270        pipe.frontend_readiness.event = self.frontend_readiness.event;
271        pipe.backend_readiness.event = self.backend_readiness.event;
272
273        if let Some(back_token) = self.backend_token {
274            pipe.set_back_token(back_token);
275        }
276
277        pipe
278    }
279}