sozu_lib/protocol/proxy_protocol/
relay.rs1use std::{cell::RefCell, io::Write, rc::Rc};
2
3use mio::{Token, net::TcpStream};
4use nom::{Err, Offset};
5use rusty_ulid::Ulid;
6
7use crate::{
8 Protocol, Readiness, SessionMetrics, SessionResult,
9 pool::Checkout,
10 protocol::{
11 pipe::{Pipe, WebSocketContext},
12 proxy_protocol::parser::parse_v2_header,
13 },
14 socket::{SocketHandler, SocketResult},
15 sozu_command::ready::Ready,
16 tcp::TcpListener,
17};
18
19pub struct RelayProxyProtocol<Front: SocketHandler> {
20 cursor_header: usize,
21 pub backend_readiness: Readiness,
22 pub backend_token: Option<Token>,
23 pub backend: Option<TcpStream>,
24 pub frontend_buffer: Checkout,
25 pub frontend_readiness: Readiness,
26 pub frontend_token: Token,
27 pub frontend: Front,
28 pub header_size: Option<usize>,
29 pub request_id: Ulid,
30}
31
32impl<Front: SocketHandler> RelayProxyProtocol<Front> {
33 pub fn new(
37 frontend: Front,
38 frontend_token: Token,
39 request_id: Ulid,
40 backend: Option<TcpStream>,
41 front_buf: Checkout,
42 ) -> Self {
43 RelayProxyProtocol {
44 backend_readiness: Readiness {
45 interest: Ready::HUP | Ready::ERROR,
46 event: Ready::EMPTY,
47 },
48 backend_token: None,
49 backend,
50 cursor_header: 0,
51 frontend_buffer: front_buf,
52 frontend_readiness: Readiness {
53 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
54 event: Ready::EMPTY,
55 },
56 frontend_token,
57 frontend,
58 header_size: None,
59 request_id,
60 }
61 }
62
63 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
64 let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
65 debug!(
66 "FRONT proxy protocol [{:?}]: read {} bytes and res={:?}",
67 self.frontend_token, sz, res
68 );
69
70 if sz > 0 {
71 self.frontend_buffer.fill(sz);
72
73 count!("bytes_in", sz as i64);
74 metrics.bin += sz;
75
76 if res == SocketResult::Error {
77 error!(
78 "[{:?}] front socket error, closing the connection",
79 self.frontend_token
80 );
81 incr!("proxy_protocol.errors");
82 self.frontend_readiness.reset();
83 self.backend_readiness.reset();
84 return SessionResult::Close;
85 }
86
87 if res == SocketResult::WouldBlock {
88 self.frontend_readiness.event.remove(Ready::READABLE);
89 }
90
91 let read_sz = match parse_v2_header(self.frontend_buffer.data()) {
92 Ok((rest, _)) => {
93 self.frontend_readiness.interest.remove(Ready::READABLE);
94 self.backend_readiness.interest.insert(Ready::WRITABLE);
95 self.frontend_buffer.data().offset(rest)
96 }
97 Err(Err::Incomplete(_)) => return SessionResult::Continue,
98 Err(e) => {
99 error!(
100 "[{:?}] error parsing the proxy protocol header(error={:?}), closing the connection",
101 self.frontend_token, e
102 );
103 return SessionResult::Close;
104 }
105 };
106
107 self.header_size = Some(read_sz);
108 self.frontend_buffer.consume(sz);
109 return SessionResult::Continue;
110 }
111
112 SessionResult::Continue
113 }
114
115 pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
118 debug!("Writing proxy protocol header");
119
120 if let Some(ref mut socket) = self.backend {
121 if let Some(ref header_size) = self.header_size {
122 loop {
123 match socket.write(self.frontend_buffer.data()) {
124 Ok(sz) => {
125 self.cursor_header += sz;
126
127 metrics.backend_bout += sz;
128 self.frontend_buffer.consume(sz);
129
130 if self.cursor_header >= *header_size {
131 info!("Proxy protocol sent, upgrading");
132 return SessionResult::Upgrade;
133 }
134 }
135 Err(e) => {
136 incr!("proxy_protocol.errors");
137 self.frontend_readiness.reset();
138 self.backend_readiness.reset();
139 debug!("PROXY PROTOCOL {}", e);
140 break;
141 }
142 }
143 }
144 }
145 }
146 SessionResult::Continue
147 }
148
149 pub fn front_socket(&self) -> &TcpStream {
150 self.frontend.socket_ref()
151 }
152
153 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
154 self.frontend.socket_mut()
155 }
156
157 pub fn back_socket(&self) -> Option<&TcpStream> {
158 self.backend.as_ref()
159 }
160
161 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
162 self.backend.as_mut()
163 }
164
165 pub fn set_back_socket(&mut self, socket: TcpStream) {
166 self.backend = Some(socket);
167 }
168
169 pub fn back_token(&self) -> Option<Token> {
170 self.backend_token
171 }
172
173 pub fn set_back_token(&mut self, token: Token) {
174 self.backend_token = Some(token);
175 }
176
177 pub fn into_pipe(
178 mut self,
179 back_buf: Checkout,
180 listener: Rc<RefCell<TcpListener>>,
181 ) -> Pipe<Front, TcpListener> {
182 let backend_socket = self.backend.take().unwrap();
183 let addr = self.front_socket().peer_addr().ok();
184
185 let mut pipe = Pipe::new(
186 back_buf,
187 None,
188 Some(backend_socket),
189 None,
190 None,
191 None,
192 None,
193 self.frontend_buffer,
194 self.frontend_token,
195 self.frontend,
196 listener,
197 Protocol::TCP,
198 self.request_id,
199 addr,
200 WebSocketContext::Tcp,
201 );
202
203 pipe.frontend_readiness.event = self.frontend_readiness.event;
204 pipe.backend_readiness.event = self.backend_readiness.event;
205
206 if let Some(back_token) = self.backend_token {
207 pipe.set_back_token(back_token);
208 }
209
210 pipe
211 }
212}