sozu_lib/protocol/
rustls.rs

1use std::{cell::RefCell, io::ErrorKind, net::SocketAddr, rc::Rc};
2
3use mio::{net::TcpStream, Token};
4use rustls::ServerConnection;
5use rusty_ulid::Ulid;
6use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext};
7
8use crate::{
9    protocol::SessionState, timer::TimeoutContainer, Readiness, Ready, SessionMetrics,
10    SessionResult, StateResult,
11};
12
13/// This macro is defined uniquely in this module to help the tracking of tls
14/// issues inside Sōzu
15macro_rules! log_context {
16    ($self:expr) => {
17        format!(
18            "RUSTLS\t{}\tSession(sni={:?}, source={:?}, frontend={}, readiness={})\t >>>",
19            $self.log_context(),
20            $self
21                .session
22                .server_name()
23                .map(|addr| addr.to_string())
24                .unwrap_or_else(|| "<none>".to_string()),
25            $self
26                .peer_address
27                .map(|addr| addr.to_string())
28                .unwrap_or_else(|| "<none>".to_string()),
29            $self.frontend_token.0,
30            $self.frontend_readiness
31        )
32    };
33}
34
35pub enum TlsState {
36    Initial,
37    Handshake,
38    Established,
39    Error,
40}
41
42pub struct TlsHandshake {
43    pub container_frontend_timeout: TimeoutContainer,
44    pub frontend_readiness: Readiness,
45    frontend_token: Token,
46    pub peer_address: Option<SocketAddr>,
47    pub request_id: Ulid,
48    pub session: ServerConnection,
49    pub stream: TcpStream,
50}
51
52impl TlsHandshake {
53    /// Instantiate a new TlsHandshake SessionState with:
54    ///
55    /// - frontend_interest: READABLE | HUP | ERROR
56    /// - frontend_event: EMPTY
57    ///
58    /// Remember to set the events from the previous State!
59    pub fn new(
60        container_frontend_timeout: TimeoutContainer,
61        session: ServerConnection,
62        stream: TcpStream,
63        frontend_token: Token,
64        request_id: Ulid,
65        peer_address: Option<SocketAddr>,
66    ) -> TlsHandshake {
67        TlsHandshake {
68            container_frontend_timeout,
69            frontend_readiness: Readiness {
70                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
71                event: Ready::EMPTY,
72            },
73            frontend_token,
74            peer_address,
75            request_id,
76            session,
77            stream,
78        }
79    }
80
81    pub fn readable(&mut self) -> SessionResult {
82        let mut can_read = true;
83
84        loop {
85            let mut can_work = false;
86
87            if self.session.wants_read() && can_read {
88                can_work = true;
89
90                match self.session.read_tls(&mut self.stream) {
91                    Ok(0) => {
92                        error!("{} Connection closed during handshake", log_context!(self));
93                        return SessionResult::Close;
94                    }
95                    Ok(_) => {}
96                    Err(e) => match e.kind() {
97                        ErrorKind::WouldBlock => {
98                            self.frontend_readiness.event.remove(Ready::READABLE);
99                            can_read = false
100                        }
101                        _ => {
102                            error!(
103                                "{} Could not perform handshake: {:?}",
104                                log_context!(self),
105                                e
106                            );
107                            return SessionResult::Close;
108                        }
109                    },
110                }
111
112                if let Err(e) = self.session.process_new_packets() {
113                    error!(
114                        "{} Could not perform handshake: {:?}",
115                        log_context!(self),
116                        e
117                    );
118                    return SessionResult::Close;
119                }
120            }
121
122            if !can_work {
123                break;
124            }
125        }
126
127        if !self.session.wants_read() {
128            self.frontend_readiness.interest.remove(Ready::READABLE);
129        }
130
131        if self.session.wants_write() {
132            self.frontend_readiness.interest.insert(Ready::WRITABLE);
133        }
134
135        if self.session.is_handshaking() {
136            SessionResult::Continue
137        } else {
138            // handshake might be finished, but we still have something to send
139            if self.session.wants_write() {
140                SessionResult::Continue
141            } else {
142                self.frontend_readiness.interest.insert(Ready::READABLE);
143                self.frontend_readiness.event.insert(Ready::READABLE);
144                self.frontend_readiness.interest.insert(Ready::WRITABLE);
145                SessionResult::Upgrade
146            }
147        }
148    }
149
150    pub fn writable(&mut self) -> SessionResult {
151        let mut can_write = true;
152
153        loop {
154            let mut can_work = false;
155
156            if self.session.wants_write() && can_write {
157                can_work = true;
158
159                match self.session.write_tls(&mut self.stream) {
160                    Ok(_) => {}
161                    Err(e) => match e.kind() {
162                        ErrorKind::WouldBlock => {
163                            self.frontend_readiness.event.remove(Ready::WRITABLE);
164                            can_write = false
165                        }
166                        _ => {
167                            error!(
168                                "{} Could not perform handshake: {:?}",
169                                log_context!(self),
170                                e
171                            );
172                            return SessionResult::Close;
173                        }
174                    },
175                }
176
177                if let Err(e) = self.session.process_new_packets() {
178                    error!(
179                        "{} Could not perform handshake: {:?}",
180                        log_context!(self),
181                        e
182                    );
183                    return SessionResult::Close;
184                }
185            }
186
187            if !can_work {
188                break;
189            }
190        }
191
192        if !self.session.wants_write() {
193            self.frontend_readiness.interest.remove(Ready::WRITABLE);
194        }
195
196        if self.session.wants_read() {
197            self.frontend_readiness.interest.insert(Ready::READABLE);
198        }
199
200        if self.session.is_handshaking() {
201            SessionResult::Continue
202        } else if self.session.wants_read() {
203            self.frontend_readiness.interest.insert(Ready::READABLE);
204            SessionResult::Upgrade
205        } else {
206            self.frontend_readiness.interest.insert(Ready::WRITABLE);
207            self.frontend_readiness.interest.insert(Ready::READABLE);
208            SessionResult::Upgrade
209        }
210    }
211
212    pub fn log_context(&self) -> LogContext {
213        LogContext {
214            request_id: self.request_id,
215            cluster_id: None,
216            backend_id: None,
217        }
218    }
219
220    pub fn front_socket(&self) -> &TcpStream {
221        &self.stream
222    }
223}
224
225impl SessionState for TlsHandshake {
226    fn ready(
227        &mut self,
228        _session: Rc<RefCell<dyn crate::ProxySession>>,
229        _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
230        _metrics: &mut SessionMetrics,
231    ) -> SessionResult {
232        let mut counter = 0;
233
234        if self.frontend_readiness.event.is_hup() {
235            return SessionResult::Close;
236        }
237
238        while counter < MAX_LOOP_ITERATIONS {
239            let frontend_interest = self.frontend_readiness.filter_interest();
240
241            trace!("{} Interest({:?})", log_context!(self), frontend_interest);
242            if frontend_interest.is_empty() {
243                break;
244            }
245
246            if frontend_interest.is_readable() {
247                let protocol_result = self.readable();
248                if protocol_result != SessionResult::Continue {
249                    return protocol_result;
250                }
251            }
252
253            if frontend_interest.is_writable() {
254                let protocol_result = self.writable();
255                if protocol_result != SessionResult::Continue {
256                    return protocol_result;
257                }
258            }
259
260            if frontend_interest.is_error() {
261                error!("{} Front socket error, disconnecting", log_context!(self));
262                self.frontend_readiness.interest = Ready::EMPTY;
263                return SessionResult::Close;
264            }
265
266            counter += 1;
267        }
268
269        if counter >= MAX_LOOP_ITERATIONS {
270            error!(
271                "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
272                 log_context!(self), MAX_LOOP_ITERATIONS
273            );
274
275            incr!("http.infinite_loop.error");
276            self.print_state("HTTPS");
277
278            return SessionResult::Close;
279        }
280
281        SessionResult::Continue
282    }
283
284    fn update_readiness(&mut self, token: Token, events: Ready) {
285        if self.frontend_token == token {
286            self.frontend_readiness.event |= events;
287        }
288    }
289
290    fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult {
291        // relevant timeout is still stored in the Session as front_timeout.
292        if self.frontend_token == token {
293            self.container_frontend_timeout.triggered();
294            return StateResult::CloseSession;
295        }
296
297        error!(
298            "{}, Expect state: got timeout for an invalid token: {:?}",
299            log_context!(self),
300            token
301        );
302        StateResult::CloseSession
303    }
304
305    fn cancel_timeouts(&mut self) {
306        self.container_frontend_timeout.cancel();
307    }
308
309    fn print_state(&self, context: &str) {
310        error!(
311            "{} Session(Handshake)\n\tFrontend:\n\t\ttoken: {:?}\treadiness: {:?}",
312            context, self.frontend_token, self.frontend_readiness
313        );
314    }
315}