sozu_lib/protocol/proxy_protocol/
relay.rs1use std::{cell::RefCell, io::Write, rc::Rc};
9
10use mio::{Token, net::TcpStream};
11use nom::{Err, Offset};
12use rusty_ulid::Ulid;
13use sozu_command::logging::ansi_palette;
14
15use crate::metrics::names;
16use crate::{
17 Protocol, Readiness, SessionMetrics, SessionResult,
18 pool::Checkout,
19 protocol::{
20 pipe::{Pipe, WebSocketContext},
21 proxy_protocol::{header::ProxyAddr, parser::parse_v2_header},
22 },
23 socket::{SocketHandler, SocketResult},
24 sozu_command::ready::Ready,
25 tcp::TcpListener,
26};
27
28#[allow(unused_macros)]
33macro_rules! log_module_context {
34 () => {{
35 let (open, reset, _, _, _) = ansi_palette();
36 format!("{open}PROXY-RELAY{reset}\t >>>", open = open, reset = reset)
37 }};
38}
39
40macro_rules! log_context {
46 ($self:expr) => {{
47 let (open, reset, grey, gray, white) = ansi_palette();
48 format!(
49 "{open}PROXY-RELAY{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}front_readiness{reset}={white}{front_readiness}{reset}, {gray}back_readiness{reset}={white}{back_readiness}{reset})\t >>>",
50 open = open,
51 reset = reset,
52 grey = grey,
53 gray = gray,
54 white = white,
55 frontend = $self.frontend_token.0,
56 backend = $self.backend_token.map(|t| t.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
57 front_readiness = $self.frontend_readiness,
58 back_readiness = $self.backend_readiness,
59 )
60 }};
61}
62
63pub struct RelayProxyProtocol<Front: SocketHandler> {
64 cursor_header: usize,
65 pub backend_readiness: Readiness,
66 pub backend_token: Option<Token>,
67 pub backend: Option<TcpStream>,
68 pub frontend_buffer: Checkout,
69 pub frontend_readiness: Readiness,
70 pub frontend_token: Token,
71 pub frontend: Front,
72 pub header_size: Option<usize>,
73 pub request_id: Ulid,
74 pub addresses: Option<ProxyAddr>,
80}
81
82impl<Front: SocketHandler> RelayProxyProtocol<Front> {
83 pub fn new(
87 frontend: Front,
88 frontend_token: Token,
89 request_id: Ulid,
90 backend: Option<TcpStream>,
91 front_buf: Checkout,
92 ) -> Self {
93 RelayProxyProtocol {
94 backend_readiness: Readiness {
95 interest: Ready::HUP | Ready::ERROR,
96 event: Ready::EMPTY,
97 },
98 backend_token: None,
99 backend,
100 cursor_header: 0,
101 frontend_buffer: front_buf,
102 frontend_readiness: Readiness {
103 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
104 event: Ready::EMPTY,
105 },
106 frontend_token,
107 frontend,
108 header_size: None,
109 request_id,
110 addresses: None,
111 }
112 }
113
114 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
115 let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
116 debug!("{} read {} bytes and res={:?}", log_context!(self), sz, res);
117
118 if sz > 0 {
119 self.frontend_buffer.fill(sz);
120
121 count!(names::backend::BYTES_IN, sz as i64);
122 metrics.bin += sz;
123
124 if res == SocketResult::Error {
125 error!(
126 "{} front socket error, closing the connection",
127 log_context!(self)
128 );
129 incr!(names::proxy_protocol::ERRORS);
130 self.frontend_readiness.reset();
131 self.backend_readiness.reset();
132 return SessionResult::Close;
133 }
134
135 if res == SocketResult::WouldBlock {
136 self.frontend_readiness.event.remove(Ready::READABLE);
137 }
138
139 let read_sz = match parse_v2_header(self.frontend_buffer.data()) {
140 Ok((rest, header)) => {
141 self.frontend_readiness.interest.remove(Ready::READABLE);
142 self.backend_readiness.interest.insert(Ready::WRITABLE);
143 self.addresses = Some(header.addr);
149 self.frontend_buffer.data().offset(rest)
150 }
151 Err(Err::Incomplete(_)) => return SessionResult::Continue,
152 Err(e) => {
153 error!(
154 "{} error parsing the proxy protocol header (error={:?}), closing the connection",
155 log_context!(self),
156 e
157 );
158 return SessionResult::Close;
159 }
160 };
161
162 self.header_size = Some(read_sz);
163 self.frontend_buffer.consume(sz);
164 return SessionResult::Continue;
165 }
166
167 SessionResult::Continue
168 }
169
170 pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
173 debug!("{} writing proxy protocol header", log_context!(self));
174
175 if let Some(ref mut socket) = self.backend {
176 if let Some(ref header_size) = self.header_size {
177 loop {
178 match socket.write(self.frontend_buffer.data()) {
179 Ok(sz) => {
180 self.cursor_header += sz;
181
182 count!(names::backend::BACK_BYTES_OUT, sz as i64);
183 metrics.backend_bout += sz;
184 self.frontend_buffer.consume(sz);
185
186 if self.cursor_header >= *header_size {
187 info!("{} proxy protocol sent, upgrading", log_context!(self));
188 return SessionResult::Upgrade;
189 }
190 }
191 Err(e) => {
192 incr!(names::proxy_protocol::ERRORS);
193 self.frontend_readiness.reset();
194 self.backend_readiness.reset();
195 debug!("{} write error: {}", log_context!(self), e);
196 break;
197 }
198 }
199 }
200 }
201 }
202 SessionResult::Continue
203 }
204
205 pub fn front_socket(&self) -> &TcpStream {
206 self.frontend.socket_ref()
207 }
208
209 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
210 self.frontend.socket_mut()
211 }
212
213 pub fn back_socket(&self) -> Option<&TcpStream> {
214 self.backend.as_ref()
215 }
216
217 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
218 self.backend.as_mut()
219 }
220
221 pub fn set_back_socket(&mut self, socket: TcpStream) {
222 self.backend = Some(socket);
223 }
224
225 pub fn back_token(&self) -> Option<Token> {
226 self.backend_token
227 }
228
229 pub fn set_back_token(&mut self, token: Token) {
230 self.backend_token = Some(token);
231 }
232
233 pub fn into_pipe(
234 mut self,
235 back_buf: Checkout,
236 listener: Rc<RefCell<TcpListener>>,
237 ) -> Pipe<Front, TcpListener> {
238 let backend_socket = self.backend.take().unwrap();
239 let addr = self
246 .addresses
247 .as_ref()
248 .and_then(|pa| pa.source())
249 .or_else(|| self.front_socket().peer_addr().ok());
250
251 let mut pipe = Pipe::new(
252 back_buf,
253 None,
254 Some(backend_socket),
255 None,
256 None,
257 None,
258 None,
259 self.frontend_buffer,
260 self.frontend_token,
261 self.frontend,
262 listener,
263 Protocol::TCP,
264 self.request_id,
265 self.request_id,
266 addr,
267 WebSocketContext::Tcp,
268 );
269
270 pipe.frontend_readiness.event = self.frontend_readiness.event;
271 pipe.backend_readiness.event = self.backend_readiness.event;
272
273 if let Some(back_token) = self.backend_token {
274 pipe.set_back_token(back_token);
275 }
276
277 pipe
278 }
279}