sozu_lib/protocol/proxy_protocol/
relay.rs1use std::{cell::RefCell, io::Write, rc::Rc};
2
3use mio::{net::TcpStream, Token};
4use nom::{Err, Offset};
5use rusty_ulid::Ulid;
6
7use crate::{
8 pool::Checkout,
9 protocol::{
10 pipe::{Pipe, WebSocketContext},
11 proxy_protocol::parser::parse_v2_header,
12 },
13 socket::{SocketHandler, SocketResult},
14 sozu_command::ready::Ready,
15 tcp::TcpListener,
16 Protocol, Readiness, SessionMetrics, SessionResult,
17};
18
19pub struct RelayProxyProtocol<Front: SocketHandler> {
20 cursor_header: usize,
21 pub backend_readiness: Readiness,
22 pub backend_token: Option<Token>,
23 pub backend: Option<TcpStream>,
24 pub frontend_buffer: Checkout,
25 pub frontend_readiness: Readiness,
26 pub frontend_token: Token,
27 pub frontend: Front,
28 pub header_size: Option<usize>,
29 pub request_id: Ulid,
30}
31
32impl<Front: SocketHandler> RelayProxyProtocol<Front> {
33 pub fn new(
37 frontend: Front,
38 frontend_token: Token,
39 request_id: Ulid,
40 backend: Option<TcpStream>,
41 front_buf: Checkout,
42 ) -> Self {
43 RelayProxyProtocol {
44 backend_readiness: Readiness {
45 interest: Ready::HUP | Ready::ERROR,
46 event: Ready::EMPTY,
47 },
48 backend_token: None,
49 backend,
50 cursor_header: 0,
51 frontend_buffer: front_buf,
52 frontend_readiness: Readiness {
53 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
54 event: Ready::EMPTY,
55 },
56 frontend_token,
57 frontend,
58 header_size: None,
59 request_id,
60 }
61 }
62
63 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
64 let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
65 debug!(
66 "FRONT proxy protocol [{:?}]: read {} bytes and res={:?}",
67 self.frontend_token, sz, res
68 );
69
70 if sz > 0 {
71 self.frontend_buffer.fill(sz);
72
73 count!("bytes_in", sz as i64);
74 metrics.bin += sz;
75
76 if res == SocketResult::Error {
77 error!(
78 "[{:?}] front socket error, closing the connection",
79 self.frontend_token
80 );
81 incr!("proxy_protocol.errors");
82 self.frontend_readiness.reset();
83 self.backend_readiness.reset();
84 return SessionResult::Close;
85 }
86
87 if res == SocketResult::WouldBlock {
88 self.frontend_readiness.event.remove(Ready::READABLE);
89 }
90
91 let read_sz = match parse_v2_header(self.frontend_buffer.data()) {
92 Ok((rest, _)) => {
93 self.frontend_readiness.interest.remove(Ready::READABLE);
94 self.backend_readiness.interest.insert(Ready::WRITABLE);
95 self.frontend_buffer.data().offset(rest)
96 }
97 Err(Err::Incomplete(_)) => return SessionResult::Continue,
98 Err(e) => {
99 error!("[{:?}] error parsing the proxy protocol header(error={:?}), closing the connection",
100 self.frontend_token, e);
101 return SessionResult::Close;
102 }
103 };
104
105 self.header_size = Some(read_sz);
106 self.frontend_buffer.consume(sz);
107 return SessionResult::Continue;
108 }
109
110 SessionResult::Continue
111 }
112
113 pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
116 debug!("Writing proxy protocol header");
117
118 if let Some(ref mut socket) = self.backend {
119 if let Some(ref header_size) = self.header_size {
120 loop {
121 match socket.write(self.frontend_buffer.data()) {
122 Ok(sz) => {
123 self.cursor_header += sz;
124
125 metrics.backend_bout += sz;
126 self.frontend_buffer.consume(sz);
127
128 if self.cursor_header >= *header_size {
129 info!("Proxy protocol sent, upgrading");
130 return SessionResult::Upgrade;
131 }
132 }
133 Err(e) => {
134 incr!("proxy_protocol.errors");
135 self.frontend_readiness.reset();
136 self.backend_readiness.reset();
137 debug!("PROXY PROTOCOL {}", e);
138 break;
139 }
140 }
141 }
142 }
143 }
144 SessionResult::Continue
145 }
146
147 pub fn front_socket(&self) -> &TcpStream {
148 self.frontend.socket_ref()
149 }
150
151 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
152 self.frontend.socket_mut()
153 }
154
155 pub fn back_socket(&self) -> Option<&TcpStream> {
156 self.backend.as_ref()
157 }
158
159 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
160 self.backend.as_mut()
161 }
162
163 pub fn set_back_socket(&mut self, socket: TcpStream) {
164 self.backend = Some(socket);
165 }
166
167 pub fn back_token(&self) -> Option<Token> {
168 self.backend_token
169 }
170
171 pub fn set_back_token(&mut self, token: Token) {
172 self.backend_token = Some(token);
173 }
174
175 pub fn into_pipe(
176 mut self,
177 back_buf: Checkout,
178 listener: Rc<RefCell<TcpListener>>,
179 ) -> Pipe<Front, TcpListener> {
180 let backend_socket = self.backend.take().unwrap();
181 let addr = self.front_socket().peer_addr().ok();
182
183 let mut pipe = Pipe::new(
184 back_buf,
185 None,
186 Some(backend_socket),
187 None,
188 None,
189 None,
190 None,
191 self.frontend_buffer,
192 self.frontend_token,
193 self.frontend,
194 listener,
195 Protocol::TCP,
196 self.request_id,
197 addr,
198 WebSocketContext::Tcp,
199 );
200
201 pipe.frontend_readiness.event = self.frontend_readiness.event;
202 pipe.backend_readiness.event = self.backend_readiness.event;
203
204 if let Some(back_token) = self.backend_token {
205 pipe.set_back_token(back_token);
206 }
207
208 pipe
209 }
210}