sozu_lib/protocol/proxy_protocol/
relay.rs1use std::{cell::RefCell, io::Write, rc::Rc};
9
10use mio::{Token, net::TcpStream};
11use nom::{Err, Offset};
12use rusty_ulid::Ulid;
13use sozu_command::logging::ansi_palette;
14
15use crate::metrics::names;
16use crate::{
17 Protocol, Readiness, SessionMetrics, SessionResult,
18 pool::Checkout,
19 protocol::{
20 pipe::{Pipe, WebSocketContext},
21 proxy_protocol::{header::ProxyAddr, parser::parse_v2_header},
22 },
23 socket::{SocketHandler, SocketResult},
24 sozu_command::ready::Ready,
25 tcp::TcpListener,
26};
27
28#[allow(unused_macros)]
33macro_rules! log_module_context {
34 () => {{
35 let (open, reset, _, _, _) = ansi_palette();
36 format!("{open}PROXY-RELAY{reset}\t >>>", open = open, reset = reset)
37 }};
38}
39
40macro_rules! log_context {
46 ($self:expr) => {{
47 let (open, reset, grey, gray, white) = ansi_palette();
48 format!(
49 "{open}PROXY-RELAY{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}front_readiness{reset}={white}{front_readiness}{reset}, {gray}back_readiness{reset}={white}{back_readiness}{reset})\t >>>",
50 open = open,
51 reset = reset,
52 grey = grey,
53 gray = gray,
54 white = white,
55 frontend = $self.frontend_token.0,
56 backend = $self.backend_token.map(|t| t.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
57 front_readiness = $self.frontend_readiness,
58 back_readiness = $self.backend_readiness,
59 )
60 }};
61}
62
63pub struct RelayProxyProtocol<Front: SocketHandler> {
64 cursor_header: usize,
65 pub backend_readiness: Readiness,
66 pub backend_token: Option<Token>,
67 pub backend: Option<TcpStream>,
68 pub frontend_buffer: Checkout,
69 pub frontend_readiness: Readiness,
70 pub frontend_token: Token,
71 pub frontend: Front,
72 pub header_size: Option<usize>,
73 pub request_id: Ulid,
74 pub addresses: Option<ProxyAddr>,
80}
81
82impl<Front: SocketHandler> RelayProxyProtocol<Front> {
83 pub fn new(
87 frontend: Front,
88 frontend_token: Token,
89 request_id: Ulid,
90 backend: Option<TcpStream>,
91 front_buf: Checkout,
92 ) -> Self {
93 RelayProxyProtocol {
94 backend_readiness: Readiness {
95 interest: Ready::HUP | Ready::ERROR,
96 event: Ready::EMPTY,
97 },
98 backend_token: None,
99 backend,
100 cursor_header: 0,
101 frontend_buffer: front_buf,
102 frontend_readiness: Readiness {
103 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
104 event: Ready::EMPTY,
105 },
106 frontend_token,
107 frontend,
108 header_size: None,
109 request_id,
110 addresses: None,
111 }
112 }
113
114 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
115 let space_before = self.frontend_buffer.available_space();
116 let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
117 debug!("{} read {} bytes and res={:?}", log_context!(self), sz, res);
118 debug_assert!(
120 sz <= space_before,
121 "socket_read cannot return more bytes than the available space"
122 );
123
124 if sz > 0 {
125 let data_before = self.frontend_buffer.available_data();
126 self.frontend_buffer.fill(sz);
127 debug_assert_eq!(
131 self.frontend_buffer.available_data(),
132 data_before + sz,
133 "fill must expose exactly the bytes just read"
134 );
135
136 count!(names::backend::BYTES_IN, sz as i64);
137 metrics.bin += sz;
138
139 if res == SocketResult::Error {
140 error!(
141 "{} front socket error, closing the connection",
142 log_context!(self)
143 );
144 incr!(names::proxy_protocol::ERRORS);
145 self.frontend_readiness.reset();
146 self.backend_readiness.reset();
147 return SessionResult::Close;
148 }
149
150 if res == SocketResult::WouldBlock {
151 self.frontend_readiness.event.remove(Ready::READABLE);
152 }
153
154 let data_len = self.frontend_buffer.available_data();
155 let read_sz = match parse_v2_header(self.frontend_buffer.data()) {
156 Ok((rest, header)) => {
157 self.frontend_readiness.interest.remove(Ready::READABLE);
158 self.backend_readiness.interest.insert(Ready::WRITABLE);
159 self.addresses = Some(header.addr);
165 let consumed = self.frontend_buffer.data().offset(rest);
166 debug_assert!(
170 consumed <= data_len,
171 "parsed header length cannot exceed the buffered bytes"
172 );
173 debug_assert!(consumed > 0, "a recognized v2 header is non-empty");
174 consumed
175 }
176 Err(Err::Incomplete(_)) => return SessionResult::Continue,
177 Err(e) => {
178 error!(
179 "{} error parsing the proxy protocol header (error={:?}), closing the connection",
180 log_context!(self),
181 e
182 );
183 return SessionResult::Close;
184 }
185 };
186
187 self.header_size = Some(read_sz);
188 self.frontend_buffer.consume(sz);
189 return SessionResult::Continue;
190 }
191
192 SessionResult::Continue
193 }
194
195 pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
198 debug!("{} writing proxy protocol header", log_context!(self));
199
200 if let Some(ref mut socket) = self.backend {
201 if let Some(ref header_size) = self.header_size {
202 loop {
203 let available_before = self.frontend_buffer.available_data();
204 match socket.write(self.frontend_buffer.data()) {
205 Ok(sz) => {
206 debug_assert!(
210 sz <= available_before,
211 "socket.write cannot send more than the buffered bytes"
212 );
213 let cursor_before = self.cursor_header;
214 self.cursor_header += sz;
215 debug_assert_eq!(
218 self.cursor_header,
219 cursor_before + sz,
220 "header cursor advances by exactly the bytes written"
221 );
222
223 count!(names::backend::BACK_BYTES_OUT, sz as i64);
224 metrics.backend_bout += sz;
225 self.frontend_buffer.consume(sz);
226
227 if self.cursor_header >= *header_size {
228 info!("{} proxy protocol sent, upgrading", log_context!(self));
229 return SessionResult::Upgrade;
230 }
231 }
232 Err(e) => {
233 incr!(names::proxy_protocol::ERRORS);
234 self.frontend_readiness.reset();
235 self.backend_readiness.reset();
236 debug!("{} write error: {}", log_context!(self), e);
237 break;
238 }
239 }
240 }
241 }
242 }
243 SessionResult::Continue
244 }
245
246 pub fn front_socket(&self) -> &TcpStream {
247 self.frontend.socket_ref()
248 }
249
250 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
251 self.frontend.socket_mut()
252 }
253
254 pub fn back_socket(&self) -> Option<&TcpStream> {
255 self.backend.as_ref()
256 }
257
258 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
259 self.backend.as_mut()
260 }
261
262 pub fn set_back_socket(&mut self, socket: TcpStream) {
263 self.backend = Some(socket);
264 }
265
266 pub fn back_token(&self) -> Option<Token> {
267 self.backend_token
268 }
269
270 pub fn set_back_token(&mut self, token: Token) {
271 self.backend_token = Some(token);
272 }
273
274 pub fn into_pipe(
275 mut self,
276 back_buf: Checkout,
277 listener: Rc<RefCell<TcpListener>>,
278 ) -> Pipe<Front, TcpListener> {
279 let backend_socket = self.backend.take().unwrap();
280 let addr = self
287 .addresses
288 .as_ref()
289 .and_then(|pa| pa.source())
290 .or_else(|| self.front_socket().peer_addr().ok());
291
292 let mut pipe = Pipe::new(
293 back_buf,
294 None,
295 Some(backend_socket),
296 None,
297 None,
298 None,
299 None,
300 self.frontend_buffer,
301 self.frontend_token,
302 self.frontend,
303 listener,
304 Protocol::TCP,
305 self.request_id,
306 self.request_id,
307 addr,
308 WebSocketContext::Tcp,
309 );
310
311 pipe.frontend_readiness.event = self.frontend_readiness.event;
312 pipe.backend_readiness.event = self.backend_readiness.event;
313
314 if let Some(back_token) = self.backend_token {
315 pipe.set_back_token(back_token);
316 }
317
318 pipe
319 }
320}