sozu_lib/protocol/h2/
mod.rs

1//FIXME: we disallow warnings for the HTTP/2 module temporarily
2#![allow(warnings)]
3use std::{cell::RefCell, net::SocketAddr, rc::Weak};
4
5use mio::{net::TcpStream, *};
6use rusty_ulid::Ulid;
7
8use crate::{
9    pool::{Checkout, Pool},
10    socket::{SocketHandler, SocketResult},
11    sozu_command::buffer::fixed::Buffer,
12    sozu_command::ready::Ready,
13    {Protocol, Readiness, SessionMetrics, StateResult},
14};
15
16mod parser;
17mod serializer;
18mod state;
19mod stream;
20
21type BackendToken = Token;
22
23#[derive(PartialEq)]
24pub enum SessionStatus {
25    Normal,
26    DefaultAnswer,
27}
28
29pub struct Http2<Front: SocketHandler> {
30    pub frontend: Connection<Front>,
31    backend: Option<TcpStream>,
32    frontend_token: Token,
33    backend_token: Option<Token>,
34    back_buf: Option<Checkout>,
35    pub cluster_id: Option<String>,
36    pub request_id: Ulid,
37    pub back_readiness: Readiness,
38    pub log_ctx: String,
39    public_address: Option<SocketAddr>,
40    pub state: Option<state::State>,
41    pool: Weak<RefCell<Pool>>,
42}
43
44impl<Front: SocketHandler> Http2<Front> {
45    pub fn new(
46        frontend: Front,
47        frontend_token: Token,
48        pool: Weak<RefCell<Pool>>,
49        public_address: Option<SocketAddr>,
50        client_address: Option<SocketAddr>,
51        sticky_name: String,
52    ) -> Http2<Front> {
53        let request_id = Ulid::generate();
54        let log_ctx = format!("{}\tunknown\t", &request_id);
55        let (read, write) = {
56            let p0 = pool.upgrade().unwrap();
57            let mut p = p0.borrow_mut();
58            let res = (p.checkout().unwrap(), p.checkout().unwrap());
59            res
60        };
61        let session = Http2 {
62            frontend: Connection::new(frontend, read, write),
63            frontend_token,
64            backend: None,
65            backend_token: None,
66            back_buf: None,
67            cluster_id: None,
68            state: Some(state::State::new()),
69            request_id,
70            back_readiness: Readiness {
71                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
72                event: Ready::EMPTY,
73            },
74            log_ctx,
75            public_address,
76            pool,
77        };
78
79        trace!("created http2");
80        session
81    }
82
83    pub fn front_socket(&self) -> &TcpStream {
84        self.frontend.socket.socket_ref()
85    }
86
87    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
88        self.frontend.socket.socket_mut()
89    }
90
91    pub fn back_socket(&self) -> Option<&TcpStream> {
92        self.backend.as_ref()
93    }
94
95    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
96        self.backend.as_mut()
97    }
98
99    pub fn set_back_socket(&mut self, socket: TcpStream) {
100        self.backend = Some(socket);
101    }
102
103    pub fn back_token(&self) -> Option<Token> {
104        self.backend_token
105    }
106
107    pub fn log_context(&self) -> String {
108        if let Some(ref cluster_id) = self.cluster_id {
109            format!("{}\t{}\t", self.request_id, cluster_id)
110        } else {
111            format!("{}\tunknown\t", self.request_id)
112        }
113    }
114
115    pub fn set_back_token(&mut self, token: Token) {
116        self.backend_token = Some(token);
117    }
118
119    pub fn front_hup(&mut self) -> StateResult {
120        StateResult::CloseSession
121    }
122
123    pub fn back_hup(&mut self) -> StateResult {
124        error!("todo[{}:{}]: back_hup", file!(), line!());
125        StateResult::CloseSession
126        /*
127        if self.back_buf.output_data_size() == 0 || self.back_buf.next_output_data().len() == 0 {
128          if self.back_readiness.event.is_readable() {
129            self.back_readiness().interest.insert(Ready::READABLE);
130            error!("Http2::back_hup: backend connection closed but the kernel still holds some data. readiness: {:?} -> {:?}", self.frontend.readiness, self.back_readiness);
131            SessionResult::Continue
132          } else {
133            SessionResult::CloseSession
134          }
135        } else {
136          self.frontend.readiness().interest.insert(Ready::WRITABLE);
137          if self.back_readiness.event.is_readable() {
138            self.back_readiness.interest.insert(Ready::READABLE);
139          }
140          SessionResult::Continue
141        }
142        */
143    }
144
145    // Read content from the session
146    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
147        trace!("http2 readable");
148        error!("todo[{}:{}]: readable", file!(), line!());
149
150        /* do not handle buffer pooling for now
151        if self.front_buf.is_none() {
152          if let Some(p) = self.pool.upgrade() {
153            if let Some(buf) = p.borrow_mut().checkout() {
154              self.front_buf = Some(buf);
155            } else {
156              error!("cannot get front buffer from pool, closing");
157              return SessionResult::CloseSession;
158            }
159          }
160        }
161        */
162
163        if self.frontend.read_buffer.available_space() == 0 {
164            if self.backend_token == None {
165                //let answer_413 = "HTTP/1.1 413 Payload Too Large\r\nContent-Length: 0\r\n\r\n";
166                //self.set_answer(DefaultAnswerStatus::Answer413, Rc::new(Vec::from(answer_413.as_bytes())));
167                self.frontend.readiness.interest.remove(Ready::READABLE);
168                self.frontend.readiness.interest.insert(Ready::WRITABLE);
169            } else {
170                self.frontend.readiness.interest.remove(Ready::READABLE);
171                self.back_readiness.interest.insert(Ready::WRITABLE);
172            }
173            return StateResult::Continue;
174        }
175
176        let res = self.frontend.read(metrics);
177
178        match res {
179            SocketResult::Error => {
180                let front_readiness = self.frontend.readiness.clone();
181                let back_readiness = self.back_readiness.clone();
182                error!(
183                    "front socket error, closing the connection. Readiness: {:?} -> {:?}",
184                    front_readiness, back_readiness
185                );
186                return StateResult::CloseSession;
187            }
188            SocketResult::Closed => {
189                return StateResult::CloseSession;
190            }
191            SocketResult::WouldBlock => {}
192            SocketResult::Continue => {}
193        };
194
195        self.readable_parse(metrics)
196    }
197
198    pub fn readable_parse(&mut self, metrics: &mut SessionMetrics) -> StateResult {
199        let mut state = self.state.take().unwrap();
200        let (sz, cont) = { state.parse_and_handle(self.frontend.read_buffer.data()) };
201        self.frontend.read_buffer.consume(sz);
202        self.frontend.readiness.interest = state.interest;
203        self.state = Some(state);
204
205        match cont {
206            state::FrameResult::Close => StateResult::CloseSession,
207            state::FrameResult::Continue => StateResult::Continue,
208            state::FrameResult::ConnectBackend(id) => StateResult::ConnectBackend,
209        }
210
211        /*let is_initial = unwrap_msg!(self.state.as_ref()).request == Some(RequestState::Initial);
212        // if there's no host, continue parsing until we find it
213        let has_host = unwrap_msg!(self.state.as_ref()).has_host();
214        if !has_host {
215          self.state = Some(parse_request_until_stop(unwrap_msg!(self.state.take()), &self.request_id,
216            &mut self.front_buf.as_mut().unwrap(), &self.sticky_name));
217          if unwrap_msg!(self.state.as_ref()).is_front_error() {
218            self.log_request_error(metrics, "front parsing error, closing the connection");
219            incr!("http.front_parse_errors");
220
221            // increment active requests here because it will be decremented right away
222            // when closing the connection. It's slightly easier than decrementing it
223            // at every place we return SessionResult::CloseSession
224            gauge_add!("http.active_requests", 1);
225
226            return SessionResult::CloseSession;
227          }
228        }
229        */
230    }
231
232    // Forward content to session
233    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
234        trace!("http2 writable");
235        error!("todo[{}:{}]: writable", file!(), line!());
236
237        let mut state = self.state.take().unwrap();
238        //FIXME: do that in a loop until no more frames or WouldBlock
239        match state.gen(self.frontend.write_buffer.space()) {
240            Ok(sz) => {
241                self.frontend.write_buffer.fill(sz);
242                //FIXME: use real condition here to indicate there was nothing to write
243                if sz == 0 {}
244            }
245            Err(e) => {
246                self.state = Some(state);
247                error!("error serializing to front write buffer: {:?}", e);
248                return StateResult::CloseSession;
249            }
250        }
251
252        self.frontend.readiness.interest = state.interest;
253
254        let res = self.frontend.write(metrics);
255        match res {
256            SocketResult::Error | SocketResult::Closed => {
257                error!(
258                    "{}\t[{:?}] error writing to front socket, closing",
259                    self.log_ctx, self.frontend_token
260                );
261                incr!("http2.errors");
262                metrics.service_stop();
263                self.frontend.readiness.reset();
264                self.back_readiness.reset();
265                self.state = Some(state);
266                return StateResult::CloseSession;
267            }
268            SocketResult::WouldBlock => {}
269            SocketResult::Continue => {}
270        }
271
272        self.state = Some(state);
273        StateResult::Continue
274    }
275
276    // Forward content to cluster
277    pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
278        trace!("http2 back_writable");
279        error!("todo[{}:{}]: back_writable", file!(), line!());
280        StateResult::CloseSession
281        /*
282        if self.front_buf.output_data_size() == 0 || self.front_buf.next_output_data().len() == 0 {
283          self.frontend.readiness.interest.insert(Ready::READABLE);
284          self.back_readiness.interest.remove(Ready::WRITABLE);
285          return SessionResult::Continue;
286        }
287
288        let tokens = self.tokens().clone();
289        let output_size = self.front_buf.output_data_size();
290
291        let mut sz = 0usize;
292        let mut socket_res = SocketResult::Continue;
293
294        if let Some(ref mut backend) = self.backend {
295          while socket_res == SocketResult::Continue && self.front_buf.output_data_size() > 0 {
296            // no more data in buffer, stop here
297            if self.front_buf.next_output_data().len() == 0 {
298              self.frontend.readiness.interest.insert(Ready::READABLE);
299              self.back_readiness.interest.remove(Ready::WRITABLE);
300              return SessionResult::Continue;
301            }
302
303            let (current_sz, current_res) = backend.socket_write(self.front_buf.next_output_data());
304            socket_res = current_res;
305            self.front_buf.consume_output_data(current_sz);
306            self.front_buf_position += current_sz;
307            sz += current_sz;
308          }
309        }
310
311        metrics.backend_bout += sz;
312
313        if let Some((front,back)) = tokens {
314          debug!("{}\tBACK [{}->{}]: wrote {} bytes of {}", self.log_ctx, front.0, back.0, sz, output_size);
315        }
316        match socket_res {
317          SocketResult::Error | SocketResult::Closed => {
318            error!("{}\tback socket write error, closing connection", self.log_ctx);
319            metrics.service_stop();
320            incr!("http2.errors");
321            self.frontend.readiness.reset();
322            self.back_readiness.reset();
323            return SessionResult::CloseSession;
324          },
325          SocketResult::WouldBlock => {
326            self.back_readiness.event.remove(Ready::WRITABLE);
327
328          },
329          SocketResult::Continue => {}
330        }
331        SessionResult::Continue
332        */
333    }
334
335    // Read content from cluster
336    pub fn back_readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
337        trace!("http2 back_readable");
338        error!("todo[{}:{}]: back_readable", file!(), line!());
339        StateResult::CloseSession
340        /*
341        if self.back_buf.buffer.available_space() == 0 {
342          self.back_readiness.interest.remove(Ready::READABLE);
343          return SessionResult::Continue;
344        }
345
346        let tokens     = self.tokens().clone();
347
348        if let Some(ref mut backend) = self.backend {
349          let (sz, r) = backend.socket_read(&mut self.back_buf.buffer.space());
350          self.back_buf.buffer.fill(sz);
351          self.back_buf.sliced_input(sz);
352          self.back_buf.consume_parsed_data(sz);
353          self.back_buf.slice_output(sz);
354
355          if let Some((front,back)) = tokens {
356            debug!("{}\tBACK  [{}<-{}]: read {} bytes", self.log_ctx, front.0, back.0, sz);
357          }
358
359          if r != SocketResult::Continue || sz == 0 {
360            self.back_readiness.event.remove(Ready::READABLE);
361          }
362          if sz > 0 {
363            self.frontend.readiness.interest.insert(Ready::WRITABLE);
364            metrics.backend_bin += sz;
365          }
366
367          match r {
368            SocketResult::Error => {
369              error!("{}\tback socket read error, closing connection", self.log_ctx);
370              metrics.service_stop();
371              incr!("http2.errors");
372              self.frontend.readiness.reset();
373              self.back_readiness.reset();
374              return SessionResult::CloseSession;
375            },
376            SocketResult::Closed => {
377              metrics.service_stop();
378              self.frontend.readiness.reset();
379              self.back_readiness.reset();
380              return SessionResult::CloseSession;
381            },
382            SocketResult::WouldBlock => {
383              self.back_readiness.event.remove(Ready::READABLE);
384            },
385            SocketResult::Continue => {}
386          }
387        }
388
389        SessionResult::Continue
390        */
391    }
392}
393
394pub struct Connection<Socket: SocketHandler> {
395    pub socket: Socket,
396    pub readiness: Readiness,
397    pub read_buffer: Checkout,
398    pub write_buffer: Checkout,
399}
400
401impl<Socket: SocketHandler> Connection<Socket> {
402    pub fn new(socket: Socket, read_buffer: Checkout, write_buffer: Checkout) -> Self {
403        Connection {
404            socket,
405            readiness: Readiness {
406                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
407                event: Ready::EMPTY,
408            },
409            //FIXME: capacity can be configured
410            read_buffer,
411            //FIXME: capacity can be configured
412            write_buffer,
413        }
414    }
415
416    pub fn read(&mut self, metrics: &mut SessionMetrics) -> SocketResult {
417        let (sz, res) = self.socket.socket_read(self.read_buffer.space());
418
419        if sz > 0 {
420            count!("bytes_in", sz as i64);
421            metrics.bin += sz;
422
423            self.read_buffer.fill(sz);
424
425            if self.read_buffer.available_space() == 0 {
426                self.readiness.interest.remove(Ready::READABLE);
427            }
428        } else {
429            self.readiness.event.remove(Ready::READABLE);
430        }
431
432        if res == SocketResult::WouldBlock {
433            self.readiness.event.remove(Ready::READABLE);
434        }
435
436        res
437    }
438
439    pub fn write(&mut self, metrics: &mut SessionMetrics) -> SocketResult {
440        let mut sz = 0usize;
441        let mut res = SocketResult::Continue;
442        while res == SocketResult::Continue && self.write_buffer.available_data() > 0 {
443            let (current_sz, current_res) = self.socket.socket_write(self.write_buffer.data());
444            res = current_res;
445            self.write_buffer.consume(current_sz);
446            sz += current_sz;
447        }
448
449        if sz > 0 {
450            count!("bytes_out", sz as i64);
451            metrics.bout += sz;
452        }
453
454        if res == SocketResult::WouldBlock {
455            self.readiness.event.remove(Ready::WRITABLE);
456        }
457
458        res
459    }
460}