sozu_lib/protocol/h2/mod.rs
1//FIXME: we disallow warnings for the HTTP/2 module temporarily
2#![allow(warnings)]
3use std::{cell::RefCell, net::SocketAddr, rc::Weak};
4
5use mio::{net::TcpStream, *};
6use rusty_ulid::Ulid;
7
8use crate::{
9 pool::{Checkout, Pool},
10 socket::{SocketHandler, SocketResult},
11 sozu_command::buffer::fixed::Buffer,
12 sozu_command::ready::Ready,
13 {Protocol, Readiness, SessionMetrics, StateResult},
14};
15
16mod parser;
17mod serializer;
18mod state;
19mod stream;
20
21type BackendToken = Token;
22
23#[derive(PartialEq)]
24pub enum SessionStatus {
25 Normal,
26 DefaultAnswer,
27}
28
29pub struct Http2<Front: SocketHandler> {
30 pub frontend: Connection<Front>,
31 backend: Option<TcpStream>,
32 frontend_token: Token,
33 backend_token: Option<Token>,
34 back_buf: Option<Checkout>,
35 pub cluster_id: Option<String>,
36 pub request_id: Ulid,
37 pub back_readiness: Readiness,
38 pub log_ctx: String,
39 public_address: Option<SocketAddr>,
40 pub state: Option<state::State>,
41 pool: Weak<RefCell<Pool>>,
42}
43
44impl<Front: SocketHandler> Http2<Front> {
45 pub fn new(
46 frontend: Front,
47 frontend_token: Token,
48 pool: Weak<RefCell<Pool>>,
49 public_address: Option<SocketAddr>,
50 client_address: Option<SocketAddr>,
51 sticky_name: String,
52 ) -> Http2<Front> {
53 let request_id = Ulid::generate();
54 let log_ctx = format!("{}\tunknown\t", &request_id);
55 let (read, write) = {
56 let p0 = pool.upgrade().unwrap();
57 let mut p = p0.borrow_mut();
58 let res = (p.checkout().unwrap(), p.checkout().unwrap());
59 res
60 };
61 let session = Http2 {
62 frontend: Connection::new(frontend, read, write),
63 frontend_token,
64 backend: None,
65 backend_token: None,
66 back_buf: None,
67 cluster_id: None,
68 state: Some(state::State::new()),
69 request_id,
70 back_readiness: Readiness {
71 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
72 event: Ready::EMPTY,
73 },
74 log_ctx,
75 public_address,
76 pool,
77 };
78
79 trace!("created http2");
80 session
81 }
82
83 pub fn front_socket(&self) -> &TcpStream {
84 self.frontend.socket.socket_ref()
85 }
86
87 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
88 self.frontend.socket.socket_mut()
89 }
90
91 pub fn back_socket(&self) -> Option<&TcpStream> {
92 self.backend.as_ref()
93 }
94
95 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
96 self.backend.as_mut()
97 }
98
99 pub fn set_back_socket(&mut self, socket: TcpStream) {
100 self.backend = Some(socket);
101 }
102
103 pub fn back_token(&self) -> Option<Token> {
104 self.backend_token
105 }
106
107 pub fn log_context(&self) -> String {
108 if let Some(ref cluster_id) = self.cluster_id {
109 format!("{}\t{}\t", self.request_id, cluster_id)
110 } else {
111 format!("{}\tunknown\t", self.request_id)
112 }
113 }
114
115 pub fn set_back_token(&mut self, token: Token) {
116 self.backend_token = Some(token);
117 }
118
119 pub fn front_hup(&mut self) -> StateResult {
120 StateResult::CloseSession
121 }
122
123 pub fn back_hup(&mut self) -> StateResult {
124 error!("todo[{}:{}]: back_hup", file!(), line!());
125 StateResult::CloseSession
126 /*
127 if self.back_buf.output_data_size() == 0 || self.back_buf.next_output_data().len() == 0 {
128 if self.back_readiness.event.is_readable() {
129 self.back_readiness().interest.insert(Ready::READABLE);
130 error!("Http2::back_hup: backend connection closed but the kernel still holds some data. readiness: {:?} -> {:?}", self.frontend.readiness, self.back_readiness);
131 SessionResult::Continue
132 } else {
133 SessionResult::CloseSession
134 }
135 } else {
136 self.frontend.readiness().interest.insert(Ready::WRITABLE);
137 if self.back_readiness.event.is_readable() {
138 self.back_readiness.interest.insert(Ready::READABLE);
139 }
140 SessionResult::Continue
141 }
142 */
143 }
144
145 // Read content from the session
146 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
147 trace!("http2 readable");
148 error!("todo[{}:{}]: readable", file!(), line!());
149
150 /* do not handle buffer pooling for now
151 if self.front_buf.is_none() {
152 if let Some(p) = self.pool.upgrade() {
153 if let Some(buf) = p.borrow_mut().checkout() {
154 self.front_buf = Some(buf);
155 } else {
156 error!("cannot get front buffer from pool, closing");
157 return SessionResult::CloseSession;
158 }
159 }
160 }
161 */
162
163 if self.frontend.read_buffer.available_space() == 0 {
164 if self.backend_token == None {
165 //let answer_413 = "HTTP/1.1 413 Payload Too Large\r\nContent-Length: 0\r\n\r\n";
166 //self.set_answer(DefaultAnswerStatus::Answer413, Rc::new(Vec::from(answer_413.as_bytes())));
167 self.frontend.readiness.interest.remove(Ready::READABLE);
168 self.frontend.readiness.interest.insert(Ready::WRITABLE);
169 } else {
170 self.frontend.readiness.interest.remove(Ready::READABLE);
171 self.back_readiness.interest.insert(Ready::WRITABLE);
172 }
173 return StateResult::Continue;
174 }
175
176 let res = self.frontend.read(metrics);
177
178 match res {
179 SocketResult::Error => {
180 let front_readiness = self.frontend.readiness.clone();
181 let back_readiness = self.back_readiness.clone();
182 error!(
183 "front socket error, closing the connection. Readiness: {:?} -> {:?}",
184 front_readiness, back_readiness
185 );
186 return StateResult::CloseSession;
187 }
188 SocketResult::Closed => {
189 return StateResult::CloseSession;
190 }
191 SocketResult::WouldBlock => {}
192 SocketResult::Continue => {}
193 };
194
195 self.readable_parse(metrics)
196 }
197
198 pub fn readable_parse(&mut self, metrics: &mut SessionMetrics) -> StateResult {
199 let mut state = self.state.take().unwrap();
200 let (sz, cont) = { state.parse_and_handle(self.frontend.read_buffer.data()) };
201 self.frontend.read_buffer.consume(sz);
202 self.frontend.readiness.interest = state.interest;
203 self.state = Some(state);
204
205 match cont {
206 state::FrameResult::Close => StateResult::CloseSession,
207 state::FrameResult::Continue => StateResult::Continue,
208 state::FrameResult::ConnectBackend(id) => StateResult::ConnectBackend,
209 }
210
211 /*let is_initial = unwrap_msg!(self.state.as_ref()).request == Some(RequestState::Initial);
212 // if there's no host, continue parsing until we find it
213 let has_host = unwrap_msg!(self.state.as_ref()).has_host();
214 if !has_host {
215 self.state = Some(parse_request_until_stop(unwrap_msg!(self.state.take()), &self.request_id,
216 &mut self.front_buf.as_mut().unwrap(), &self.sticky_name));
217 if unwrap_msg!(self.state.as_ref()).is_front_error() {
218 self.log_request_error(metrics, "front parsing error, closing the connection");
219 incr!("http.front_parse_errors");
220
221 // increment active requests here because it will be decremented right away
222 // when closing the connection. It's slightly easier than decrementing it
223 // at every place we return SessionResult::CloseSession
224 gauge_add!("http.active_requests", 1);
225
226 return SessionResult::CloseSession;
227 }
228 }
229 */
230 }
231
232 // Forward content to session
233 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
234 trace!("http2 writable");
235 error!("todo[{}:{}]: writable", file!(), line!());
236
237 let mut state = self.state.take().unwrap();
238 //FIXME: do that in a loop until no more frames or WouldBlock
239 match state.gen(self.frontend.write_buffer.space()) {
240 Ok(sz) => {
241 self.frontend.write_buffer.fill(sz);
242 //FIXME: use real condition here to indicate there was nothing to write
243 if sz == 0 {}
244 }
245 Err(e) => {
246 self.state = Some(state);
247 error!("error serializing to front write buffer: {:?}", e);
248 return StateResult::CloseSession;
249 }
250 }
251
252 self.frontend.readiness.interest = state.interest;
253
254 let res = self.frontend.write(metrics);
255 match res {
256 SocketResult::Error | SocketResult::Closed => {
257 error!(
258 "{}\t[{:?}] error writing to front socket, closing",
259 self.log_ctx, self.frontend_token
260 );
261 incr!("http2.errors");
262 metrics.service_stop();
263 self.frontend.readiness.reset();
264 self.back_readiness.reset();
265 self.state = Some(state);
266 return StateResult::CloseSession;
267 }
268 SocketResult::WouldBlock => {}
269 SocketResult::Continue => {}
270 }
271
272 self.state = Some(state);
273 StateResult::Continue
274 }
275
276 // Forward content to cluster
277 pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
278 trace!("http2 back_writable");
279 error!("todo[{}:{}]: back_writable", file!(), line!());
280 StateResult::CloseSession
281 /*
282 if self.front_buf.output_data_size() == 0 || self.front_buf.next_output_data().len() == 0 {
283 self.frontend.readiness.interest.insert(Ready::READABLE);
284 self.back_readiness.interest.remove(Ready::WRITABLE);
285 return SessionResult::Continue;
286 }
287
288 let tokens = self.tokens().clone();
289 let output_size = self.front_buf.output_data_size();
290
291 let mut sz = 0usize;
292 let mut socket_res = SocketResult::Continue;
293
294 if let Some(ref mut backend) = self.backend {
295 while socket_res == SocketResult::Continue && self.front_buf.output_data_size() > 0 {
296 // no more data in buffer, stop here
297 if self.front_buf.next_output_data().len() == 0 {
298 self.frontend.readiness.interest.insert(Ready::READABLE);
299 self.back_readiness.interest.remove(Ready::WRITABLE);
300 return SessionResult::Continue;
301 }
302
303 let (current_sz, current_res) = backend.socket_write(self.front_buf.next_output_data());
304 socket_res = current_res;
305 self.front_buf.consume_output_data(current_sz);
306 self.front_buf_position += current_sz;
307 sz += current_sz;
308 }
309 }
310
311 metrics.backend_bout += sz;
312
313 if let Some((front,back)) = tokens {
314 debug!("{}\tBACK [{}->{}]: wrote {} bytes of {}", self.log_ctx, front.0, back.0, sz, output_size);
315 }
316 match socket_res {
317 SocketResult::Error | SocketResult::Closed => {
318 error!("{}\tback socket write error, closing connection", self.log_ctx);
319 metrics.service_stop();
320 incr!("http2.errors");
321 self.frontend.readiness.reset();
322 self.back_readiness.reset();
323 return SessionResult::CloseSession;
324 },
325 SocketResult::WouldBlock => {
326 self.back_readiness.event.remove(Ready::WRITABLE);
327
328 },
329 SocketResult::Continue => {}
330 }
331 SessionResult::Continue
332 */
333 }
334
335 // Read content from cluster
336 pub fn back_readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
337 trace!("http2 back_readable");
338 error!("todo[{}:{}]: back_readable", file!(), line!());
339 StateResult::CloseSession
340 /*
341 if self.back_buf.buffer.available_space() == 0 {
342 self.back_readiness.interest.remove(Ready::READABLE);
343 return SessionResult::Continue;
344 }
345
346 let tokens = self.tokens().clone();
347
348 if let Some(ref mut backend) = self.backend {
349 let (sz, r) = backend.socket_read(&mut self.back_buf.buffer.space());
350 self.back_buf.buffer.fill(sz);
351 self.back_buf.sliced_input(sz);
352 self.back_buf.consume_parsed_data(sz);
353 self.back_buf.slice_output(sz);
354
355 if let Some((front,back)) = tokens {
356 debug!("{}\tBACK [{}<-{}]: read {} bytes", self.log_ctx, front.0, back.0, sz);
357 }
358
359 if r != SocketResult::Continue || sz == 0 {
360 self.back_readiness.event.remove(Ready::READABLE);
361 }
362 if sz > 0 {
363 self.frontend.readiness.interest.insert(Ready::WRITABLE);
364 metrics.backend_bin += sz;
365 }
366
367 match r {
368 SocketResult::Error => {
369 error!("{}\tback socket read error, closing connection", self.log_ctx);
370 metrics.service_stop();
371 incr!("http2.errors");
372 self.frontend.readiness.reset();
373 self.back_readiness.reset();
374 return SessionResult::CloseSession;
375 },
376 SocketResult::Closed => {
377 metrics.service_stop();
378 self.frontend.readiness.reset();
379 self.back_readiness.reset();
380 return SessionResult::CloseSession;
381 },
382 SocketResult::WouldBlock => {
383 self.back_readiness.event.remove(Ready::READABLE);
384 },
385 SocketResult::Continue => {}
386 }
387 }
388
389 SessionResult::Continue
390 */
391 }
392}
393
394pub struct Connection<Socket: SocketHandler> {
395 pub socket: Socket,
396 pub readiness: Readiness,
397 pub read_buffer: Checkout,
398 pub write_buffer: Checkout,
399}
400
401impl<Socket: SocketHandler> Connection<Socket> {
402 pub fn new(socket: Socket, read_buffer: Checkout, write_buffer: Checkout) -> Self {
403 Connection {
404 socket,
405 readiness: Readiness {
406 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
407 event: Ready::EMPTY,
408 },
409 //FIXME: capacity can be configured
410 read_buffer,
411 //FIXME: capacity can be configured
412 write_buffer,
413 }
414 }
415
416 pub fn read(&mut self, metrics: &mut SessionMetrics) -> SocketResult {
417 let (sz, res) = self.socket.socket_read(self.read_buffer.space());
418
419 if sz > 0 {
420 count!("bytes_in", sz as i64);
421 metrics.bin += sz;
422
423 self.read_buffer.fill(sz);
424
425 if self.read_buffer.available_space() == 0 {
426 self.readiness.interest.remove(Ready::READABLE);
427 }
428 } else {
429 self.readiness.event.remove(Ready::READABLE);
430 }
431
432 if res == SocketResult::WouldBlock {
433 self.readiness.event.remove(Ready::READABLE);
434 }
435
436 res
437 }
438
439 pub fn write(&mut self, metrics: &mut SessionMetrics) -> SocketResult {
440 let mut sz = 0usize;
441 let mut res = SocketResult::Continue;
442 while res == SocketResult::Continue && self.write_buffer.available_data() > 0 {
443 let (current_sz, current_res) = self.socket.socket_write(self.write_buffer.data());
444 res = current_res;
445 self.write_buffer.consume(current_sz);
446 sz += current_sz;
447 }
448
449 if sz > 0 {
450 count!("bytes_out", sz as i64);
451 metrics.bout += sz;
452 }
453
454 if res == SocketResult::WouldBlock {
455 self.readiness.event.remove(Ready::WRITABLE);
456 }
457
458 res
459 }
460}