sozu_lib/protocol/proxy_protocol/
send.rs1use std::{
9 cell::RefCell,
10 io::{ErrorKind, Write},
11 rc::Rc,
12};
13
14use mio::{Token, net::TcpStream};
15use rusty_ulid::Ulid;
16use sozu_command::logging::ansi_palette;
17
18use crate::metrics::names;
19use crate::{
20 BackendConnectionStatus, Protocol, Readiness, SessionMetrics, SessionResult,
21 pool::Checkout,
22 protocol::{
23 pipe::{Pipe, WebSocketContext},
24 proxy_protocol::header::{Command, HeaderV2, ProxyProtocolHeader},
25 },
26 socket::SocketHandler,
27 sozu_command::ready::Ready,
28 tcp::TcpListener,
29};
30
31#[allow(unused_macros)]
36macro_rules! log_module_context {
37 () => {{
38 let (open, reset, _, _, _) = ansi_palette();
39 format!("{open}PROXY-SEND{reset}\t >>>", open = open, reset = reset)
40 }};
41}
42
43macro_rules! log_context {
48 ($self:expr) => {{
49 let (open, reset, grey, gray, white) = ansi_palette();
50 format!(
51 "{open}PROXY-SEND{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}front_readiness{reset}={white}{front_readiness}{reset}, {gray}back_readiness{reset}={white}{back_readiness}{reset})\t >>>",
52 open = open,
53 reset = reset,
54 grey = grey,
55 gray = gray,
56 white = white,
57 frontend = $self.frontend_token.0,
58 backend = $self.backend_token.map(|t| t.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
59 front_readiness = $self.frontend_readiness,
60 back_readiness = $self.backend_readiness,
61 )
62 }};
63}
64
65pub struct SendProxyProtocol<Front: SocketHandler> {
66 cursor_header: usize,
67 pub backend_readiness: Readiness,
68 pub backend_token: Option<Token>,
69 pub backend: Option<TcpStream>,
70 pub frontend_readiness: Readiness,
71 pub frontend_token: Token,
72 pub frontend: Front,
73 pub header: Option<Vec<u8>>,
74 pub request_id: Ulid,
75}
76
77impl<Front: SocketHandler> SendProxyProtocol<Front> {
78 pub fn new(
84 frontend: Front,
85 frontend_token: Token,
86 request_id: Ulid,
87 backend: Option<TcpStream>,
88 ) -> Self {
89 SendProxyProtocol {
90 header: None,
91 frontend,
92 request_id,
93 backend,
94 frontend_token,
95 backend_token: None,
96 frontend_readiness: Readiness {
97 interest: Ready::HUP | Ready::ERROR,
98 event: Ready::EMPTY,
99 },
100 backend_readiness: Readiness {
101 interest: Ready::HUP | Ready::ERROR,
102 event: Ready::EMPTY,
103 },
104 cursor_header: 0,
105 }
106 }
107
108 pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
111 debug!(
112 "{} trying to write proxy protocol header",
113 log_context!(self)
114 );
115
116 if self.header.is_none() {
118 if let Ok(local_addr) = self.front_socket().local_addr() {
119 if let Ok(frontend_addr) = self.front_socket().peer_addr() {
120 self.header = Some(
121 ProxyProtocolHeader::V2(HeaderV2::new(
122 Command::Proxy,
123 frontend_addr,
124 local_addr,
125 ))
126 .into_bytes(),
127 );
128 } else {
129 return SessionResult::Close;
130 }
131 };
132 }
133
134 if let Some(ref mut socket) = self.backend {
135 if let Some(ref mut header) = self.header {
136 loop {
137 match socket.write(&header[self.cursor_header..]) {
138 Ok(sz) => {
139 self.cursor_header += sz;
140 count!(names::backend::BACK_BYTES_OUT, sz as i64);
141 metrics.backend_bout += sz;
142
143 if self.cursor_header == header.len() {
144 debug!("{} proxy protocol sent, upgrading", log_context!(self));
145 return SessionResult::Upgrade;
146 }
147 }
148 Err(e) => match e.kind() {
149 ErrorKind::WouldBlock => {
150 self.backend_readiness.event.remove(Ready::WRITABLE);
151 return SessionResult::Continue;
152 }
153 e => {
154 incr!(names::proxy_protocol::ERRORS);
155 debug!("{} write error: {:?}", log_context!(self), e);
156 return SessionResult::Close;
157 }
158 },
159 }
160 }
161 }
162 }
163
164 error!(
165 "{} started send proxy protocol with no header or backend socket",
166 log_context!(self)
167 );
168 SessionResult::Close
169 }
170
171 pub fn front_socket(&self) -> &TcpStream {
172 self.frontend.socket_ref()
173 }
174
175 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
176 self.frontend.socket_mut()
177 }
178
179 pub fn back_socket(&self) -> Option<&TcpStream> {
180 self.backend.as_ref()
181 }
182
183 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
184 self.backend.as_mut()
185 }
186
187 pub fn set_back_socket(&mut self, socket: TcpStream) {
188 self.backend = Some(socket);
189 }
190
191 pub fn back_token(&self) -> Option<Token> {
192 self.backend_token
193 }
194
195 pub fn set_back_token(&mut self, token: Token) {
196 self.backend_token = Some(token);
197 }
198
199 pub fn set_back_connected(&mut self, status: BackendConnectionStatus) {
200 if status == BackendConnectionStatus::Connected {
201 self.backend_readiness.interest.insert(Ready::WRITABLE);
202 }
203 }
204
205 pub fn into_pipe(
206 mut self,
207 front_buf: Checkout,
208 back_buf: Checkout,
209 listener: Rc<RefCell<TcpListener>>,
210 ) -> Pipe<Front, TcpListener> {
211 let backend_socket = self.backend.take().unwrap();
212 let addr = self.front_socket().peer_addr().ok();
213
214 let mut pipe = Pipe::new(
215 back_buf,
216 None,
217 Some(backend_socket),
218 None,
219 None,
220 None,
221 None,
222 front_buf,
223 self.frontend_token,
224 self.frontend,
225 listener,
226 Protocol::TCP,
227 self.request_id,
228 self.request_id,
229 addr,
230 WebSocketContext::Tcp,
231 );
232
233 pipe.frontend_readiness = self.frontend_readiness;
234 pipe.backend_readiness = self.backend_readiness;
235
236 pipe.frontend_readiness.interest.insert(Ready::READABLE);
237 pipe.backend_readiness.interest.insert(Ready::READABLE);
238
239 if let Some(back_token) = self.backend_token {
240 pipe.set_back_token(back_token);
241 }
242
243 pipe
244 }
245}
246
247#[cfg(test)]
248mod send_test {
249 use std::{
250 io::Read,
251 net::{SocketAddr, TcpListener as StdTcpListener, TcpStream as StdTcpStream},
252 os::unix::io::{FromRawFd, IntoRawFd},
253 sync::{Arc, Barrier},
254 thread::{self, JoinHandle},
255 };
256
257 use mio::net::{TcpListener, TcpStream};
258 use rusty_ulid::Ulid;
259
260 use super::{
261 super::parser::parse_v2_header, BackendConnectionStatus, ErrorKind, SendProxyProtocol,
262 SessionMetrics, SessionResult, Token,
263 };
264
265 #[test]
266 fn it_should_send_a_proxy_protocol_header_to_the_upstream_backend() {
267 setup_test_logger!();
268 let addr_client: SocketAddr = "127.0.0.1:6666".parse().expect("parse address error");
269 let addr_backend: SocketAddr = "127.0.0.1:2001".parse().expect("parse address error");
270 let barrier = Arc::new(Barrier::new(3));
271 let end_barrier = Arc::new(Barrier::new(2));
272
273 start_client(addr_client, barrier.clone(), end_barrier.clone());
274 let backend = start_backend(addr_backend, barrier.clone(), end_barrier);
275 start_middleware(addr_client, addr_backend, barrier);
276
277 backend
278 .join()
279 .expect("Couldn't join on the associated backend");
280 }
281
282 fn start_middleware(addr_client: SocketAddr, addr_backend: SocketAddr, barrier: Arc<Barrier>) {
285 let listener = TcpListener::bind(addr_client).expect("could not accept session connection");
286
287 let client_stream;
288 barrier.wait();
289
290 loop {
291 if let Ok((stream, _addr)) = listener.accept() {
292 client_stream = stream;
293 break;
294 }
295 }
296
297 let backend_stream =
299 StdTcpStream::connect(addr_backend).expect("could not connect to the backend");
300 let fd = backend_stream.into_raw_fd();
301 let backend_stream = unsafe { TcpStream::from_raw_fd(fd) };
305
306 let mut send_pp = SendProxyProtocol::new(
307 client_stream,
308 Token(0),
309 Ulid::generate(),
310 Some(backend_stream),
311 );
312 let mut session_metrics = SessionMetrics::new(None);
313
314 send_pp.set_back_connected(BackendConnectionStatus::Connected);
315
316 loop {
317 let result = send_pp.back_writable(&mut session_metrics);
318 if result == SessionResult::Upgrade {
319 break;
320 }
321
322 if result != SessionResult::Continue {
323 panic!("state machine error: result = {result:?}");
324 }
325 }
326 }
327
328 fn start_client(addr: SocketAddr, barrier: Arc<Barrier>, end_barrier: Arc<Barrier>) {
330 thread::spawn(move || {
331 barrier.wait();
332
333 let _stream = StdTcpStream::connect(addr).unwrap();
334
335 end_barrier.wait();
336 });
337 }
338
339 fn start_backend(
342 addr: SocketAddr,
343 barrier: Arc<Barrier>,
344 end_barrier: Arc<Barrier>,
345 ) -> JoinHandle<()> {
346 let listener = StdTcpListener::bind(addr).expect("could not start backend");
347
348 thread::spawn(move || {
349 barrier.wait();
350
351 let mut buf: [u8; 28] = [0; 28];
352 let (mut conn, _) = listener
353 .accept()
354 .expect("could not accept connection from light middleware");
355 println!("backend got a connection from the middleware");
356
357 let mut index = 0usize;
358 loop {
359 if index >= 28 {
360 break;
361 }
362
363 match conn.read(&mut buf[index..]) {
364 Err(e) => match e.kind() {
365 ErrorKind::WouldBlock => continue,
366 e => {
367 end_barrier.wait();
368 panic!("read error: {e:?}");
369 }
370 },
371 Ok(sz) => {
372 println!("backend read {sz} bytes");
373 index += sz;
374 }
375 }
376 }
377
378 match parse_v2_header(&buf) {
379 Ok((_, _)) => println!("complete header received"),
380 err => {
381 end_barrier.wait();
382 panic!("incorrect proxy protocol header received: {err:?}");
383 }
384 };
385
386 end_barrier.wait();
387 })
388 }
389}