sozu_lib/protocol/h2/mod.rs
1//FIXME: we disallow warnings for the HTTP/2 module temporarily
2#![allow(warnings)]
3use std::{cell::RefCell, net::SocketAddr, rc::Weak};
4
5use mio::{net::TcpStream, *};
6use rusty_ulid::Ulid;
7
8use crate::{
9 Protocol, Readiness, SessionMetrics, StateResult,
10 pool::{Checkout, Pool},
11 socket::{SocketHandler, SocketResult},
12 sozu_command::{buffer::fixed::Buffer, ready::Ready},
13};
14
15mod parser;
16mod serializer;
17mod state;
18mod stream;
19
20type BackendToken = Token;
21
22#[derive(PartialEq)]
23pub enum SessionStatus {
24 Normal,
25 DefaultAnswer,
26}
27
28pub struct Http2<Front: SocketHandler> {
29 pub frontend: Connection<Front>,
30 backend: Option<TcpStream>,
31 frontend_token: Token,
32 backend_token: Option<Token>,
33 back_buf: Option<Checkout>,
34 pub cluster_id: Option<String>,
35 pub request_id: Ulid,
36 pub back_readiness: Readiness,
37 pub log_ctx: String,
38 public_address: Option<SocketAddr>,
39 pub state: Option<state::State>,
40 pool: Weak<RefCell<Pool>>,
41}
42
43impl<Front: SocketHandler> Http2<Front> {
44 pub fn new(
45 frontend: Front,
46 frontend_token: Token,
47 pool: Weak<RefCell<Pool>>,
48 public_address: Option<SocketAddr>,
49 client_address: Option<SocketAddr>,
50 sticky_name: String,
51 ) -> Http2<Front> {
52 let request_id = Ulid::generate();
53 let log_ctx = format!("{}\tunknown\t", &request_id);
54 let (read, write) = {
55 let p0 = pool.upgrade().unwrap();
56 let mut p = p0.borrow_mut();
57 let res = (p.checkout().unwrap(), p.checkout().unwrap());
58 res
59 };
60 let session = Http2 {
61 frontend: Connection::new(frontend, read, write),
62 frontend_token,
63 backend: None,
64 backend_token: None,
65 back_buf: None,
66 cluster_id: None,
67 state: Some(state::State::new()),
68 request_id,
69 back_readiness: Readiness {
70 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
71 event: Ready::EMPTY,
72 },
73 log_ctx,
74 public_address,
75 pool,
76 };
77
78 trace!("created http2");
79 session
80 }
81
82 pub fn front_socket(&self) -> &TcpStream {
83 self.frontend.socket.socket_ref()
84 }
85
86 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
87 self.frontend.socket.socket_mut()
88 }
89
90 pub fn back_socket(&self) -> Option<&TcpStream> {
91 self.backend.as_ref()
92 }
93
94 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
95 self.backend.as_mut()
96 }
97
98 pub fn set_back_socket(&mut self, socket: TcpStream) {
99 self.backend = Some(socket);
100 }
101
102 pub fn back_token(&self) -> Option<Token> {
103 self.backend_token
104 }
105
106 pub fn log_context(&self) -> String {
107 if let Some(ref cluster_id) = self.cluster_id {
108 format!("{}\t{}\t", self.request_id, cluster_id)
109 } else {
110 format!("{}\tunknown\t", self.request_id)
111 }
112 }
113
114 pub fn set_back_token(&mut self, token: Token) {
115 self.backend_token = Some(token);
116 }
117
118 pub fn front_hup(&mut self) -> StateResult {
119 StateResult::CloseSession
120 }
121
122 pub fn back_hup(&mut self) -> StateResult {
123 error!("todo[{}:{}]: back_hup", file!(), line!());
124 StateResult::CloseSession
125 /*
126 if self.back_buf.output_data_size() == 0 || self.back_buf.next_output_data().len() == 0 {
127 if self.back_readiness.event.is_readable() {
128 self.back_readiness().interest.insert(Ready::READABLE);
129 error!("Http2::back_hup: backend connection closed but the kernel still holds some data. readiness: {:?} -> {:?}", self.frontend.readiness, self.back_readiness);
130 SessionResult::Continue
131 } else {
132 SessionResult::CloseSession
133 }
134 } else {
135 self.frontend.readiness().interest.insert(Ready::WRITABLE);
136 if self.back_readiness.event.is_readable() {
137 self.back_readiness.interest.insert(Ready::READABLE);
138 }
139 SessionResult::Continue
140 }
141 */
142 }
143
144 // Read content from the session
145 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
146 trace!("http2 readable");
147 error!("todo[{}:{}]: readable", file!(), line!());
148
149 /* do not handle buffer pooling for now
150 if self.front_buf.is_none() {
151 if let Some(p) = self.pool.upgrade() {
152 if let Some(buf) = p.borrow_mut().checkout() {
153 self.front_buf = Some(buf);
154 } else {
155 error!("cannot get front buffer from pool, closing");
156 return SessionResult::CloseSession;
157 }
158 }
159 }
160 */
161
162 if self.frontend.read_buffer.available_space() == 0 {
163 if self.backend_token == None {
164 //let answer_413 = "HTTP/1.1 413 Payload Too Large\r\nContent-Length: 0\r\n\r\n";
165 //self.set_answer(DefaultAnswerStatus::Answer413, Rc::new(Vec::from(answer_413.as_bytes())));
166 self.frontend.readiness.interest.remove(Ready::READABLE);
167 self.frontend.readiness.interest.insert(Ready::WRITABLE);
168 } else {
169 self.frontend.readiness.interest.remove(Ready::READABLE);
170 self.back_readiness.interest.insert(Ready::WRITABLE);
171 }
172 return StateResult::Continue;
173 }
174
175 let res = self.frontend.read(metrics);
176
177 match res {
178 SocketResult::Error => {
179 let front_readiness = self.frontend.readiness.clone();
180 let back_readiness = self.back_readiness.clone();
181 error!(
182 "front socket error, closing the connection. Readiness: {:?} -> {:?}",
183 front_readiness, back_readiness
184 );
185 return StateResult::CloseSession;
186 }
187 SocketResult::Closed => {
188 return StateResult::CloseSession;
189 }
190 SocketResult::WouldBlock => {}
191 SocketResult::Continue => {}
192 };
193
194 self.readable_parse(metrics)
195 }
196
197 pub fn readable_parse(&mut self, metrics: &mut SessionMetrics) -> StateResult {
198 let mut state = self.state.take().unwrap();
199 let (sz, cont) = { state.parse_and_handle(self.frontend.read_buffer.data()) };
200 self.frontend.read_buffer.consume(sz);
201 self.frontend.readiness.interest = state.interest;
202 self.state = Some(state);
203
204 match cont {
205 state::FrameResult::Close => StateResult::CloseSession,
206 state::FrameResult::Continue => StateResult::Continue,
207 state::FrameResult::ConnectBackend(id) => StateResult::ConnectBackend,
208 }
209
210 /*let is_initial = unwrap_msg!(self.state.as_ref()).request == Some(RequestState::Initial);
211 // if there's no host, continue parsing until we find it
212 let has_host = unwrap_msg!(self.state.as_ref()).has_host();
213 if !has_host {
214 self.state = Some(parse_request_until_stop(unwrap_msg!(self.state.take()), &self.request_id,
215 &mut self.front_buf.as_mut().unwrap(), &self.sticky_name));
216 if unwrap_msg!(self.state.as_ref()).is_front_error() {
217 self.log_request_error(metrics, "front parsing error, closing the connection");
218 incr!("http.front_parse_errors");
219
220 // increment active requests here because it will be decremented right away
221 // when closing the connection. It's slightly easier than decrementing it
222 // at every place we return SessionResult::CloseSession
223 gauge_add!("http.active_requests", 1);
224
225 return SessionResult::CloseSession;
226 }
227 }
228 */
229 }
230
231 // Forward content to session
232 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
233 trace!("http2 writable");
234 error!("todo[{}:{}]: writable", file!(), line!());
235
236 let mut state = self.state.take().unwrap();
237 //FIXME: do that in a loop until no more frames or WouldBlock
238 match state.r#gen(self.frontend.write_buffer.space()) {
239 Ok(sz) => {
240 self.frontend.write_buffer.fill(sz);
241 //FIXME: use real condition here to indicate there was nothing to write
242 if sz == 0 {}
243 }
244 Err(e) => {
245 self.state = Some(state);
246 error!("error serializing to front write buffer: {:?}", e);
247 return StateResult::CloseSession;
248 }
249 }
250
251 self.frontend.readiness.interest = state.interest;
252
253 let res = self.frontend.write(metrics);
254 match res {
255 SocketResult::Error | SocketResult::Closed => {
256 error!(
257 "{}\t[{:?}] error writing to front socket, closing",
258 self.log_ctx, self.frontend_token
259 );
260 incr!("http2.errors");
261 metrics.service_stop();
262 self.frontend.readiness.reset();
263 self.back_readiness.reset();
264 self.state = Some(state);
265 return StateResult::CloseSession;
266 }
267 SocketResult::WouldBlock => {}
268 SocketResult::Continue => {}
269 }
270
271 self.state = Some(state);
272 StateResult::Continue
273 }
274
275 // Forward content to cluster
276 pub fn back_writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
277 trace!("http2 back_writable");
278 error!("todo[{}:{}]: back_writable", file!(), line!());
279 StateResult::CloseSession
280 /*
281 if self.front_buf.output_data_size() == 0 || self.front_buf.next_output_data().len() == 0 {
282 self.frontend.readiness.interest.insert(Ready::READABLE);
283 self.back_readiness.interest.remove(Ready::WRITABLE);
284 return SessionResult::Continue;
285 }
286
287 let tokens = self.tokens().clone();
288 let output_size = self.front_buf.output_data_size();
289
290 let mut sz = 0usize;
291 let mut socket_res = SocketResult::Continue;
292
293 if let Some(ref mut backend) = self.backend {
294 while socket_res == SocketResult::Continue && self.front_buf.output_data_size() > 0 {
295 // no more data in buffer, stop here
296 if self.front_buf.next_output_data().len() == 0 {
297 self.frontend.readiness.interest.insert(Ready::READABLE);
298 self.back_readiness.interest.remove(Ready::WRITABLE);
299 return SessionResult::Continue;
300 }
301
302 let (current_sz, current_res) = backend.socket_write(self.front_buf.next_output_data());
303 socket_res = current_res;
304 self.front_buf.consume_output_data(current_sz);
305 self.front_buf_position += current_sz;
306 sz += current_sz;
307 }
308 }
309
310 metrics.backend_bout += sz;
311
312 if let Some((front,back)) = tokens {
313 debug!("{}\tBACK [{}->{}]: wrote {} bytes of {}", self.log_ctx, front.0, back.0, sz, output_size);
314 }
315 match socket_res {
316 SocketResult::Error | SocketResult::Closed => {
317 error!("{}\tback socket write error, closing connection", self.log_ctx);
318 metrics.service_stop();
319 incr!("http2.errors");
320 self.frontend.readiness.reset();
321 self.back_readiness.reset();
322 return SessionResult::CloseSession;
323 },
324 SocketResult::WouldBlock => {
325 self.back_readiness.event.remove(Ready::WRITABLE);
326
327 },
328 SocketResult::Continue => {}
329 }
330 SessionResult::Continue
331 */
332 }
333
334 // Read content from cluster
335 pub fn back_readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
336 trace!("http2 back_readable");
337 error!("todo[{}:{}]: back_readable", file!(), line!());
338 StateResult::CloseSession
339 /*
340 if self.back_buf.buffer.available_space() == 0 {
341 self.back_readiness.interest.remove(Ready::READABLE);
342 return SessionResult::Continue;
343 }
344
345 let tokens = self.tokens().clone();
346
347 if let Some(ref mut backend) = self.backend {
348 let (sz, r) = backend.socket_read(&mut self.back_buf.buffer.space());
349 self.back_buf.buffer.fill(sz);
350 self.back_buf.sliced_input(sz);
351 self.back_buf.consume_parsed_data(sz);
352 self.back_buf.slice_output(sz);
353
354 if let Some((front,back)) = tokens {
355 debug!("{}\tBACK [{}<-{}]: read {} bytes", self.log_ctx, front.0, back.0, sz);
356 }
357
358 if r != SocketResult::Continue || sz == 0 {
359 self.back_readiness.event.remove(Ready::READABLE);
360 }
361 if sz > 0 {
362 self.frontend.readiness.interest.insert(Ready::WRITABLE);
363 metrics.backend_bin += sz;
364 }
365
366 match r {
367 SocketResult::Error => {
368 error!("{}\tback socket read error, closing connection", self.log_ctx);
369 metrics.service_stop();
370 incr!("http2.errors");
371 self.frontend.readiness.reset();
372 self.back_readiness.reset();
373 return SessionResult::CloseSession;
374 },
375 SocketResult::Closed => {
376 metrics.service_stop();
377 self.frontend.readiness.reset();
378 self.back_readiness.reset();
379 return SessionResult::CloseSession;
380 },
381 SocketResult::WouldBlock => {
382 self.back_readiness.event.remove(Ready::READABLE);
383 },
384 SocketResult::Continue => {}
385 }
386 }
387
388 SessionResult::Continue
389 */
390 }
391}
392
393pub struct Connection<Socket: SocketHandler> {
394 pub socket: Socket,
395 pub readiness: Readiness,
396 pub read_buffer: Checkout,
397 pub write_buffer: Checkout,
398}
399
400impl<Socket: SocketHandler> Connection<Socket> {
401 pub fn new(socket: Socket, read_buffer: Checkout, write_buffer: Checkout) -> Self {
402 Connection {
403 socket,
404 readiness: Readiness {
405 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
406 event: Ready::EMPTY,
407 },
408 //FIXME: capacity can be configured
409 read_buffer,
410 //FIXME: capacity can be configured
411 write_buffer,
412 }
413 }
414
415 pub fn read(&mut self, metrics: &mut SessionMetrics) -> SocketResult {
416 let (sz, res) = self.socket.socket_read(self.read_buffer.space());
417
418 if sz > 0 {
419 count!("bytes_in", sz as i64);
420 metrics.bin += sz;
421
422 self.read_buffer.fill(sz);
423
424 if self.read_buffer.available_space() == 0 {
425 self.readiness.interest.remove(Ready::READABLE);
426 }
427 } else {
428 self.readiness.event.remove(Ready::READABLE);
429 }
430
431 if res == SocketResult::WouldBlock {
432 self.readiness.event.remove(Ready::READABLE);
433 }
434
435 res
436 }
437
438 pub fn write(&mut self, metrics: &mut SessionMetrics) -> SocketResult {
439 let mut sz = 0usize;
440 let mut res = SocketResult::Continue;
441 while res == SocketResult::Continue && self.write_buffer.available_data() > 0 {
442 let (current_sz, current_res) = self.socket.socket_write(self.write_buffer.data());
443 res = current_res;
444 self.write_buffer.consume(current_sz);
445 sz += current_sz;
446 }
447
448 if sz > 0 {
449 count!("bytes_out", sz as i64);
450 metrics.bout += sz;
451 }
452
453 if res == SocketResult::WouldBlock {
454 self.readiness.event.remove(Ready::WRITABLE);
455 }
456
457 res
458 }
459}