sozu_lib/protocol/h2/
mod.rs

1//FIXME: we disallow warnings for the HTTP/2 module temporarily
2#![allow(warnings)]
3use std::{cell::RefCell, net::SocketAddr, rc::Weak};
4
5use mio::{net::TcpStream, *};
6use rusty_ulid::Ulid;
7
8use crate::{
9    Protocol, Readiness, SessionMetrics, StateResult,
10    pool::{Checkout, Pool},
11    socket::{SocketHandler, SocketResult},
12    sozu_command::{buffer::fixed::Buffer, ready::Ready},
13};
14
15mod parser;
16mod serializer;
17mod state;
18mod stream;
19
20type BackendToken = Token;
21
22#[derive(PartialEq)]
23pub enum SessionStatus {
24    Normal,
25    DefaultAnswer,
26}
27
28pub struct Http2<Front: SocketHandler> {
29    pub frontend: Connection<Front>,
30    backend: Option<TcpStream>,
31    frontend_token: Token,
32    backend_token: Option<Token>,
33    back_buf: Option<Checkout>,
34    pub cluster_id: Option<String>,
35    pub request_id: Ulid,
36    pub back_readiness: Readiness,
37    pub log_ctx: String,
38    public_address: Option<SocketAddr>,
39    pub state: Option<state::State>,
40    pool: Weak<RefCell<Pool>>,
41}
42
43impl<Front: SocketHandler> Http2<Front> {
44    pub fn new(
45        frontend: Front,
46        frontend_token: Token,
47        pool: Weak<RefCell<Pool>>,
48        public_address: Option<SocketAddr>,
49        client_address: Option<SocketAddr>,
50        sticky_name: String,
51    ) -> Http2<Front> {
52        let request_id = Ulid::generate();
53        let log_ctx = format!("{}\tunknown\t", &request_id);
54        let (read, write) = {
55            let p0 = pool.upgrade().unwrap();
56            let mut p = p0.borrow_mut();
57            let res = (p.checkout().unwrap(), p.checkout().unwrap());
58            res
59        };
60        let session = Http2 {
61            frontend: Connection::new(frontend, read, write),
62            frontend_token,
63            backend: None,
64            backend_token: None,
65            back_buf: None,
66            cluster_id: None,
67            state: Some(state::State::new()),
68            request_id,
69            back_readiness: Readiness {
70                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
71                event: Ready::EMPTY,
72            },
73            log_ctx,
74            public_address,
75            pool,
76        };
77
78        trace!("created http2");
79        session
80    }
81
82    pub fn front_socket(&self) -> &TcpStream {
83        self.frontend.socket.socket_ref()
84    }
85
86    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
87        self.frontend.socket.socket_mut()
88    }
89
90    pub fn back_socket(&self) -> Option<&TcpStream> {
91        self.backend.as_ref()
92    }
93
94    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
95        self.backend.as_mut()
96    }
97
98    pub fn set_back_socket(&mut self, socket: TcpStream) {
99        self.backend = Some(socket);
100    }
101
102    pub fn back_token(&self) -> Option<Token> {
103        self.backend_token
104    }
105
106    pub fn log_context(&self) -> String {
107        if let Some(ref cluster_id) = self.cluster_id {
108            format!("{}\t{}\t", self.request_id, cluster_id)
109        } else {
110            format!("{}\tunknown\t", self.request_id)
111        }
112    }
113
114    pub fn set_back_token(&mut self, token: Token) {
115        self.backend_token = Some(token);
116    }
117
118    pub fn front_hup(&mut self) -> StateResult {
119        StateResult::CloseSession
120    }
121
122    pub fn back_hup(&mut self) -> StateResult {
123        error!("todo[{}:{}]: back_hup", file!(), line!());
124        StateResult::CloseSession
125        /*
126        if self.back_buf.output_data_size() == 0 || self.back_buf.next_output_data().len() == 0 {
127          if self.back_readiness.event.is_readable() {
128            self.back_readiness().interest.insert(Ready::READABLE);
129            error!("Http2::back_hup: backend connection closed but the kernel still holds some data. readiness: {:?} -> {:?}", self.frontend.readiness, self.back_readiness);
130            SessionResult::Continue
131          } else {
132            SessionResult::CloseSession
133          }
134        } else {
135          self.frontend.readiness().interest.insert(Ready::WRITABLE);
136          if self.back_readiness.event.is_readable() {
137            self.back_readiness.interest.insert(Ready::READABLE);
138          }
139          SessionResult::Continue
140        }
141        */
142    }
143
144    // Read content from the session
145    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
146        trace!("http2 readable");
147        error!("todo[{}:{}]: readable", file!(), line!());
148
149        /* do not handle buffer pooling for now
150        if self.front_buf.is_none() {
151          if let Some(p) = self.pool.upgrade() {
152            if let Some(buf) = p.borrow_mut().checkout() {
153              self.front_buf = Some(buf);
154            } else {
155              error!("cannot get front buffer from pool, closing");
156              return SessionResult::CloseSession;
157            }
158          }
159        }
160        */
161
162        if self.frontend.read_buffer.available_space() == 0 {
163            if self.backend_token == None {
164                //let answer_413 = "HTTP/1.1 413 Payload Too Large\r\nContent-Length: 0\r\n\r\n";
165                //self.set_answer(DefaultAnswerStatus::Answer413, Rc::new(Vec::from(answer_413.as_bytes())));
166                self.frontend.readiness.interest.remove(Ready::READABLE);
167                self.frontend.readiness.interest.insert(Ready::WRITABLE);
168            } else {
169                self.frontend.readiness.interest.remove(Ready::READABLE);
170                self.back_readiness.interest.insert(Ready::WRITABLE);
171            }
172            return StateResult::Continue;
173        }
174
175        let res = self.frontend.read(metrics);
176
177        match res {
178            SocketResult::Error => {
179                let front_readiness = self.frontend.readiness.clone();
180                let back_readiness = self.back_readiness.clone();
181                error!(
182                    "front socket error, closing the connection. Readiness: {:?} -> {:?}",
183                    front_readiness, back_readiness
184                );
185                return StateResult::CloseSession;
186            }
187            SocketResult::Closed => {
188                return StateResult::CloseSession;
189            }
190            SocketResult::WouldBlock => {}
191            SocketResult::Continue => {}
192        };
193
194        self.readable_parse(metrics)
195    }
196
197    pub fn readable_parse(&mut self, metrics: &mut SessionMetrics) -> StateResult {
198        let mut state = self.state.take().unwrap();
199        let (sz, cont) = { state.parse_and_handle(self.frontend.read_buffer.data()) };
200        self.frontend.read_buffer.consume(sz);
201        self.frontend.readiness.interest = state.interest;
202        self.state = Some(state);
203
204        match cont {
205            state::FrameResult::Close => StateResult::CloseSession,
206            state::FrameResult::Continue => StateResult::Continue,
207            state::FrameResult::ConnectBackend(id) => StateResult::ConnectBackend,
208        }
209
210        /*let is_initial = unwrap_msg!(self.state.as_ref()).request == Some(RequestState::Initial);
211        // if there's no host, continue parsing until we find it
212        let has_host = unwrap_msg!(self.state.as_ref()).has_host();
213        if !has_host {
214          self.state = Some(parse_request_until_stop(unwrap_msg!(self.state.take()), &self.request_id,
215            &mut self.front_buf.as_mut().unwrap(), &self.sticky_name));
216          if unwrap_msg!(self.state.as_ref()).is_front_error() {
217            self.log_request_error(metrics, "front parsing error, closing the connection");
218            incr!("http.front_parse_errors");
219
220            // increment active requests here because it will be decremented right away
221            // when closing the connection. It's slightly easier than decrementing it
222            // at every place we return SessionResult::CloseSession
223            gauge_add!("http.active_requests", 1);
224
225            return SessionResult::CloseSession;
226          }
227        }
228        */
229    }
230
231    // Forward content to session
232    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
233        trace!("http2 writable");
234        error!("todo[{}:{}]: writable", file!(), line!());
235
236        let mut state = self.state.take().unwrap();
237        //FIXME: do that in a loop until no more frames or WouldBlock
238        match state.r#gen(self.frontend.write_buffer.space()) {
239            Ok(sz) => {
240                self.frontend.write_buffer.fill(sz);
241                //FIXME: use real condition here to indicate there was nothing to write
242                if sz == 0 {}
243            }
244            Err(e) => {
245                self.state = Some(state);
246                error!("error serializing to front write buffer: {:?}", e);
247                return StateResult::CloseSession;
248            }
249        }
250
251        self.frontend.readiness.interest = state.interest;
252
253        let res = self.frontend.write(metrics);
254        match res {
255            SocketResult::Error | SocketResult::Closed => {
256                error!(
257                    "{}\t[{:?}] error writing to front socket, closing",
258                    self.log_ctx, self.frontend_token
259                );
260                incr!("http2.errors");
261                metrics.service_stop();
262                self.frontend.readiness.reset();
263                self.back_readiness.reset();
264                self.state = Some(state);
265                return StateResult::CloseSession;
266            }
267            SocketResult::WouldBlock => {}
268            SocketResult::Continue => {}
269        }
270
271        self.state = Some(state);
272        StateResult::Continue
273    }
274
275    // Forward content to cluster
276    pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
277        trace!("http2 back_writable");
278        error!("todo[{}:{}]: back_writable", file!(), line!());
279        StateResult::CloseSession
280        /*
281        if self.front_buf.output_data_size() == 0 || self.front_buf.next_output_data().len() == 0 {
282          self.frontend.readiness.interest.insert(Ready::READABLE);
283          self.back_readiness.interest.remove(Ready::WRITABLE);
284          return SessionResult::Continue;
285        }
286
287        let tokens = self.tokens().clone();
288        let output_size = self.front_buf.output_data_size();
289
290        let mut sz = 0usize;
291        let mut socket_res = SocketResult::Continue;
292
293        if let Some(ref mut backend) = self.backend {
294          while socket_res == SocketResult::Continue && self.front_buf.output_data_size() > 0 {
295            // no more data in buffer, stop here
296            if self.front_buf.next_output_data().len() == 0 {
297              self.frontend.readiness.interest.insert(Ready::READABLE);
298              self.back_readiness.interest.remove(Ready::WRITABLE);
299              return SessionResult::Continue;
300            }
301
302            let (current_sz, current_res) = backend.socket_write(self.front_buf.next_output_data());
303            socket_res = current_res;
304            self.front_buf.consume_output_data(current_sz);
305            self.front_buf_position += current_sz;
306            sz += current_sz;
307          }
308        }
309
310        metrics.backend_bout += sz;
311
312        if let Some((front,back)) = tokens {
313          debug!("{}\tBACK [{}->{}]: wrote {} bytes of {}", self.log_ctx, front.0, back.0, sz, output_size);
314        }
315        match socket_res {
316          SocketResult::Error | SocketResult::Closed => {
317            error!("{}\tback socket write error, closing connection", self.log_ctx);
318            metrics.service_stop();
319            incr!("http2.errors");
320            self.frontend.readiness.reset();
321            self.back_readiness.reset();
322            return SessionResult::CloseSession;
323          },
324          SocketResult::WouldBlock => {
325            self.back_readiness.event.remove(Ready::WRITABLE);
326
327          },
328          SocketResult::Continue => {}
329        }
330        SessionResult::Continue
331        */
332    }
333
334    // Read content from cluster
335    pub fn back_readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
336        trace!("http2 back_readable");
337        error!("todo[{}:{}]: back_readable", file!(), line!());
338        StateResult::CloseSession
339        /*
340        if self.back_buf.buffer.available_space() == 0 {
341          self.back_readiness.interest.remove(Ready::READABLE);
342          return SessionResult::Continue;
343        }
344
345        let tokens     = self.tokens().clone();
346
347        if let Some(ref mut backend) = self.backend {
348          let (sz, r) = backend.socket_read(&mut self.back_buf.buffer.space());
349          self.back_buf.buffer.fill(sz);
350          self.back_buf.sliced_input(sz);
351          self.back_buf.consume_parsed_data(sz);
352          self.back_buf.slice_output(sz);
353
354          if let Some((front,back)) = tokens {
355            debug!("{}\tBACK  [{}<-{}]: read {} bytes", self.log_ctx, front.0, back.0, sz);
356          }
357
358          if r != SocketResult::Continue || sz == 0 {
359            self.back_readiness.event.remove(Ready::READABLE);
360          }
361          if sz > 0 {
362            self.frontend.readiness.interest.insert(Ready::WRITABLE);
363            metrics.backend_bin += sz;
364          }
365
366          match r {
367            SocketResult::Error => {
368              error!("{}\tback socket read error, closing connection", self.log_ctx);
369              metrics.service_stop();
370              incr!("http2.errors");
371              self.frontend.readiness.reset();
372              self.back_readiness.reset();
373              return SessionResult::CloseSession;
374            },
375            SocketResult::Closed => {
376              metrics.service_stop();
377              self.frontend.readiness.reset();
378              self.back_readiness.reset();
379              return SessionResult::CloseSession;
380            },
381            SocketResult::WouldBlock => {
382              self.back_readiness.event.remove(Ready::READABLE);
383            },
384            SocketResult::Continue => {}
385          }
386        }
387
388        SessionResult::Continue
389        */
390    }
391}
392
393pub struct Connection<Socket: SocketHandler> {
394    pub socket: Socket,
395    pub readiness: Readiness,
396    pub read_buffer: Checkout,
397    pub write_buffer: Checkout,
398}
399
400impl<Socket: SocketHandler> Connection<Socket> {
401    pub fn new(socket: Socket, read_buffer: Checkout, write_buffer: Checkout) -> Self {
402        Connection {
403            socket,
404            readiness: Readiness {
405                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
406                event: Ready::EMPTY,
407            },
408            //FIXME: capacity can be configured
409            read_buffer,
410            //FIXME: capacity can be configured
411            write_buffer,
412        }
413    }
414
415    pub fn read(&mut self, metrics: &mut SessionMetrics) -> SocketResult {
416        let (sz, res) = self.socket.socket_read(self.read_buffer.space());
417
418        if sz > 0 {
419            count!("bytes_in", sz as i64);
420            metrics.bin += sz;
421
422            self.read_buffer.fill(sz);
423
424            if self.read_buffer.available_space() == 0 {
425                self.readiness.interest.remove(Ready::READABLE);
426            }
427        } else {
428            self.readiness.event.remove(Ready::READABLE);
429        }
430
431        if res == SocketResult::WouldBlock {
432            self.readiness.event.remove(Ready::READABLE);
433        }
434
435        res
436    }
437
438    pub fn write(&mut self, metrics: &mut SessionMetrics) -> SocketResult {
439        let mut sz = 0usize;
440        let mut res = SocketResult::Continue;
441        while res == SocketResult::Continue && self.write_buffer.available_data() > 0 {
442            let (current_sz, current_res) = self.socket.socket_write(self.write_buffer.data());
443            res = current_res;
444            self.write_buffer.consume(current_sz);
445            sz += current_sz;
446        }
447
448        if sz > 0 {
449            count!("bytes_out", sz as i64);
450            metrics.bout += sz;
451        }
452
453        if res == SocketResult::WouldBlock {
454            self.readiness.event.remove(Ready::WRITABLE);
455        }
456
457        res
458    }
459}