sozu_lib/protocol/
rustls.rs1use std::{cell::RefCell, io::ErrorKind, net::SocketAddr, rc::Rc};
2
3use mio::{net::TcpStream, Token};
4use rustls::ServerConnection;
5use rusty_ulid::Ulid;
6use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext};
7
8use crate::{
9 protocol::SessionState, timer::TimeoutContainer, Readiness, Ready, SessionMetrics,
10 SessionResult, StateResult,
11};
12
13macro_rules! log_context {
16 ($self:expr) => {
17 format!(
18 "RUSTLS\t{}\tSession(sni={:?}, source={:?}, frontend={}, readiness={})\t >>>",
19 $self.log_context(),
20 $self
21 .session
22 .server_name()
23 .map(|addr| addr.to_string())
24 .unwrap_or_else(|| "<none>".to_string()),
25 $self
26 .peer_address
27 .map(|addr| addr.to_string())
28 .unwrap_or_else(|| "<none>".to_string()),
29 $self.frontend_token.0,
30 $self.frontend_readiness
31 )
32 };
33}
34
35pub enum TlsState {
36 Initial,
37 Handshake,
38 Established,
39 Error,
40}
41
42pub struct TlsHandshake {
43 pub container_frontend_timeout: TimeoutContainer,
44 pub frontend_readiness: Readiness,
45 frontend_token: Token,
46 pub peer_address: Option<SocketAddr>,
47 pub request_id: Ulid,
48 pub session: ServerConnection,
49 pub stream: TcpStream,
50}
51
52impl TlsHandshake {
53 pub fn new(
60 container_frontend_timeout: TimeoutContainer,
61 session: ServerConnection,
62 stream: TcpStream,
63 frontend_token: Token,
64 request_id: Ulid,
65 peer_address: Option<SocketAddr>,
66 ) -> TlsHandshake {
67 TlsHandshake {
68 container_frontend_timeout,
69 frontend_readiness: Readiness {
70 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
71 event: Ready::EMPTY,
72 },
73 frontend_token,
74 peer_address,
75 request_id,
76 session,
77 stream,
78 }
79 }
80
81 pub fn readable(&mut self) -> SessionResult {
82 let mut can_read = true;
83
84 loop {
85 let mut can_work = false;
86
87 if self.session.wants_read() && can_read {
88 can_work = true;
89
90 match self.session.read_tls(&mut self.stream) {
91 Ok(0) => {
92 error!("{} Connection closed during handshake", log_context!(self));
93 return SessionResult::Close;
94 }
95 Ok(_) => {}
96 Err(e) => match e.kind() {
97 ErrorKind::WouldBlock => {
98 self.frontend_readiness.event.remove(Ready::READABLE);
99 can_read = false
100 }
101 _ => {
102 error!(
103 "{} Could not perform handshake: {:?}",
104 log_context!(self),
105 e
106 );
107 return SessionResult::Close;
108 }
109 },
110 }
111
112 if let Err(e) = self.session.process_new_packets() {
113 error!(
114 "{} Could not perform handshake: {:?}",
115 log_context!(self),
116 e
117 );
118 return SessionResult::Close;
119 }
120 }
121
122 if !can_work {
123 break;
124 }
125 }
126
127 if !self.session.wants_read() {
128 self.frontend_readiness.interest.remove(Ready::READABLE);
129 }
130
131 if self.session.wants_write() {
132 self.frontend_readiness.interest.insert(Ready::WRITABLE);
133 }
134
135 if self.session.is_handshaking() {
136 SessionResult::Continue
137 } else {
138 if self.session.wants_write() {
140 SessionResult::Continue
141 } else {
142 self.frontend_readiness.interest.insert(Ready::READABLE);
143 self.frontend_readiness.event.insert(Ready::READABLE);
144 self.frontend_readiness.interest.insert(Ready::WRITABLE);
145 SessionResult::Upgrade
146 }
147 }
148 }
149
150 pub fn writable(&mut self) -> SessionResult {
151 let mut can_write = true;
152
153 loop {
154 let mut can_work = false;
155
156 if self.session.wants_write() && can_write {
157 can_work = true;
158
159 match self.session.write_tls(&mut self.stream) {
160 Ok(_) => {}
161 Err(e) => match e.kind() {
162 ErrorKind::WouldBlock => {
163 self.frontend_readiness.event.remove(Ready::WRITABLE);
164 can_write = false
165 }
166 _ => {
167 error!(
168 "{} Could not perform handshake: {:?}",
169 log_context!(self),
170 e
171 );
172 return SessionResult::Close;
173 }
174 },
175 }
176
177 if let Err(e) = self.session.process_new_packets() {
178 error!(
179 "{} Could not perform handshake: {:?}",
180 log_context!(self),
181 e
182 );
183 return SessionResult::Close;
184 }
185 }
186
187 if !can_work {
188 break;
189 }
190 }
191
192 if !self.session.wants_write() {
193 self.frontend_readiness.interest.remove(Ready::WRITABLE);
194 }
195
196 if self.session.wants_read() {
197 self.frontend_readiness.interest.insert(Ready::READABLE);
198 }
199
200 if self.session.is_handshaking() {
201 SessionResult::Continue
202 } else if self.session.wants_read() {
203 self.frontend_readiness.interest.insert(Ready::READABLE);
204 SessionResult::Upgrade
205 } else {
206 self.frontend_readiness.interest.insert(Ready::WRITABLE);
207 self.frontend_readiness.interest.insert(Ready::READABLE);
208 SessionResult::Upgrade
209 }
210 }
211
212 pub fn log_context(&self) -> LogContext {
213 LogContext {
214 request_id: self.request_id,
215 cluster_id: None,
216 backend_id: None,
217 }
218 }
219
220 pub fn front_socket(&self) -> &TcpStream {
221 &self.stream
222 }
223}
224
225impl SessionState for TlsHandshake {
226 fn ready(
227 &mut self,
228 _session: Rc<RefCell<dyn crate::ProxySession>>,
229 _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
230 _metrics: &mut SessionMetrics,
231 ) -> SessionResult {
232 let mut counter = 0;
233
234 if self.frontend_readiness.event.is_hup() {
235 return SessionResult::Close;
236 }
237
238 while counter < MAX_LOOP_ITERATIONS {
239 let frontend_interest = self.frontend_readiness.filter_interest();
240
241 trace!("{} Interest({:?})", log_context!(self), frontend_interest);
242 if frontend_interest.is_empty() {
243 break;
244 }
245
246 if frontend_interest.is_readable() {
247 let protocol_result = self.readable();
248 if protocol_result != SessionResult::Continue {
249 return protocol_result;
250 }
251 }
252
253 if frontend_interest.is_writable() {
254 let protocol_result = self.writable();
255 if protocol_result != SessionResult::Continue {
256 return protocol_result;
257 }
258 }
259
260 if frontend_interest.is_error() {
261 error!("{} Front socket error, disconnecting", log_context!(self));
262 self.frontend_readiness.interest = Ready::EMPTY;
263 return SessionResult::Close;
264 }
265
266 counter += 1;
267 }
268
269 if counter >= MAX_LOOP_ITERATIONS {
270 error!(
271 "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
272 log_context!(self), MAX_LOOP_ITERATIONS
273 );
274
275 incr!("http.infinite_loop.error");
276 self.print_state("HTTPS");
277
278 return SessionResult::Close;
279 }
280
281 SessionResult::Continue
282 }
283
284 fn update_readiness(&mut self, token: Token, events: Ready) {
285 if self.frontend_token == token {
286 self.frontend_readiness.event |= events;
287 }
288 }
289
290 fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
291 if self.frontend_token == token {
293 self.container_frontend_timeout.triggered();
294 return StateResult::CloseSession;
295 }
296
297 error!(
298 "{}, Expect state: got timeout for an invalid token: {:?}",
299 log_context!(self),
300 token
301 );
302 StateResult::CloseSession
303 }
304
305 fn cancel_timeouts(&mut self) {
306 self.container_frontend_timeout.cancel();
307 }
308
309 fn print_state(&self, context: &str) {
310 error!(
311 "{} Session(Handshake)\n\tFrontend:\n\t\ttoken: {:?}\treadiness: {:?}",
312 context, self.frontend_token, self.frontend_readiness
313 );
314 }
315}