sozu_lib/protocol/kawa_h1/
mod.rs

1pub mod answers;
2pub mod diagnostics;
3pub mod editor;
4pub mod parser;
5
6use std::{
7    cell::RefCell,
8    io::ErrorKind,
9    net::{Shutdown, SocketAddr},
10    rc::{Rc, Weak},
11    time::{Duration, Instant},
12};
13
14use mio::{net::TcpStream, Interest, Token};
15use rusty_ulid::Ulid;
16use sozu_command::{
17    config::MAX_LOOP_ITERATIONS,
18    logging::EndpointRecord,
19    proto::command::{Event, EventKind, ListenerType},
20};
21// use time::{Duration, Instant};
22
23use crate::{
24    backends::{Backend, BackendError},
25    pool::{Checkout, Pool},
26    protocol::{
27        http::{
28            answers::DefaultAnswerStream,
29            diagnostics::{diagnostic_400_502, diagnostic_413_507},
30            editor::HttpContext,
31            parser::Method,
32        },
33        pipe::WebSocketContext,
34        SessionState,
35    },
36    retry::RetryPolicy,
37    router::Route,
38    server::{push_event, CONN_RETRIES},
39    socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
40    sozu_command::{logging::LogContext, ready::Ready},
41    timer::TimeoutContainer,
42    AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus,
43    L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession, Readiness,
44    RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult,
45};
46
47/// This macro is defined uniquely in this module to help the tracking of kawa h1
48/// issues inside Sōzu
49macro_rules! log_context {
50    ($self:expr) => {
51        format!(
52            "KAWA-H1\t{}\tSession(public={}, session={}, frontend={}, readiness={}, backend={}, readiness={})\t >>>",
53            $self.context.log_context(),
54            $self.context.public_address.to_string(),
55            $self.context.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
56            $self.frontend_token.0,
57            $self.frontend_readiness,
58            $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
59            $self.backend_readiness,
60        )
61    };
62}
63
64/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
65type GenericHttpStream = kawa::Kawa<Checkout>;
66
67impl kawa::AsBuffer for Checkout {
68    fn as_buffer(&self) -> &[u8] {
69        self.inner.extra()
70    }
71    fn as_mut_buffer(&mut self) -> &mut [u8] {
72        self.inner.extra_mut()
73    }
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum DefaultAnswer {
78    Answer301 {
79        location: String,
80    },
81    Answer400 {
82        message: String,
83        phase: kawa::ParsingPhaseMarker,
84        successfully_parsed: String,
85        partially_parsed: String,
86        invalid: String,
87    },
88    Answer401 {},
89    Answer404 {},
90    Answer408 {
91        duration: String,
92    },
93    Answer413 {
94        message: String,
95        phase: kawa::ParsingPhaseMarker,
96        capacity: usize,
97    },
98    Answer502 {
99        message: String,
100        phase: kawa::ParsingPhaseMarker,
101        successfully_parsed: String,
102        partially_parsed: String,
103        invalid: String,
104    },
105    Answer503 {
106        message: String,
107    },
108    Answer504 {
109        duration: String,
110    },
111    Answer507 {
112        phase: kawa::ParsingPhaseMarker,
113        message: String,
114        capacity: usize,
115    },
116}
117
118impl From<&DefaultAnswer> for u16 {
119    fn from(answer: &DefaultAnswer) -> u16 {
120        match answer {
121            DefaultAnswer::Answer301 { .. } => 301,
122            DefaultAnswer::Answer400 { .. } => 400,
123            DefaultAnswer::Answer401 { .. } => 401,
124            DefaultAnswer::Answer404 { .. } => 404,
125            DefaultAnswer::Answer408 { .. } => 408,
126            DefaultAnswer::Answer413 { .. } => 413,
127            DefaultAnswer::Answer502 { .. } => 502,
128            DefaultAnswer::Answer503 { .. } => 503,
129            DefaultAnswer::Answer504 { .. } => 504,
130            DefaultAnswer::Answer507 { .. } => 507,
131        }
132    }
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum TimeoutStatus {
137    Request,
138    Response,
139    WaitingForNewRequest,
140    WaitingForResponse,
141}
142
143pub enum ResponseStream {
144    BackendAnswer(GenericHttpStream),
145    DefaultAnswer(u16, DefaultAnswerStream),
146}
147
148/// Http will be contained in State which itself is contained by Session
149pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
150    answers: Rc<RefCell<answers::HttpAnswers>>,
151    pub backend: Option<Rc<RefCell<Backend>>>,
152    backend_connection_status: BackendConnectionStatus,
153    pub backend_readiness: Readiness,
154    pub backend_socket: Option<TcpStream>,
155    backend_stop: Option<Instant>,
156    pub backend_token: Option<Token>,
157    pub container_backend_timeout: TimeoutContainer,
158    pub container_frontend_timeout: TimeoutContainer,
159    configured_backend_timeout: Duration,
160    configured_connect_timeout: Duration,
161    configured_frontend_timeout: Duration,
162    /// attempts to connect to the backends during the session
163    connection_attempts: u8,
164    pub frontend_readiness: Readiness,
165    pub frontend_socket: Front,
166    frontend_token: Token,
167    keepalive_count: usize,
168    listener: Rc<RefCell<L>>,
169    pub request_stream: GenericHttpStream,
170    pub response_stream: ResponseStream,
171    /// The HTTP context was separated from the State for borrowing reasons.
172    /// Calling a kawa parser mutably borrows the State through request_stream or response_stream,
173    /// so Http can't be borrowed again to be used in callbacks. HttContext is an independant
174    /// subsection of Http that can be mutably borrowed for parser callbacks.
175    pub context: HttpContext,
176}
177
178impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
179    /// Instantiate a new HTTP SessionState with:
180    ///
181    /// - frontend_interest: READABLE | HUP | ERROR
182    /// - frontend_event: EMPTY
183    /// - backend_interest: EMPTY
184    /// - backend_event: EMPTY
185    ///
186    /// Remember to set the events from the previous State!
187    #[allow(clippy::too_many_arguments)]
188    pub fn new(
189        answers: Rc<RefCell<answers::HttpAnswers>>,
190        configured_backend_timeout: Duration,
191        configured_connect_timeout: Duration,
192        configured_frontend_timeout: Duration,
193        container_frontend_timeout: TimeoutContainer,
194        frontend_socket: Front,
195        frontend_token: Token,
196        listener: Rc<RefCell<L>>,
197        pool: Weak<RefCell<Pool>>,
198        protocol: Protocol,
199        public_address: SocketAddr,
200        request_id: Ulid,
201        session_address: Option<SocketAddr>,
202        sticky_name: String,
203    ) -> Result<Http<Front, L>, AcceptError> {
204        let (front_buffer, back_buffer) = match pool.upgrade() {
205            Some(pool) => {
206                let mut pool = pool.borrow_mut();
207                match (pool.checkout(), pool.checkout()) {
208                    (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
209                    _ => return Err(AcceptError::BufferCapacityReached),
210                }
211            }
212            None => return Err(AcceptError::BufferCapacityReached),
213        };
214        Ok(Http {
215            answers,
216            backend_connection_status: BackendConnectionStatus::NotConnected,
217            backend_readiness: Readiness::new(),
218            backend_socket: None,
219            backend_stop: None,
220            backend_token: None,
221            backend: None,
222            configured_backend_timeout,
223            configured_connect_timeout,
224            configured_frontend_timeout,
225            connection_attempts: 0,
226            container_backend_timeout: TimeoutContainer::new_empty(configured_connect_timeout),
227            container_frontend_timeout,
228            frontend_readiness: Readiness {
229                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
230                event: Ready::EMPTY,
231            },
232            frontend_socket,
233            frontend_token,
234            keepalive_count: 0,
235            listener,
236            request_stream: GenericHttpStream::new(
237                kawa::Kind::Request,
238                kawa::Buffer::new(front_buffer),
239            ),
240            response_stream: ResponseStream::BackendAnswer(GenericHttpStream::new(
241                kawa::Kind::Response,
242                kawa::Buffer::new(back_buffer),
243            )),
244            context: HttpContext {
245                id: request_id,
246                backend_id: None,
247                cluster_id: None,
248
249                closing: false,
250                keep_alive_backend: true,
251                keep_alive_frontend: true,
252                protocol,
253                public_address,
254                session_address,
255                sticky_name,
256                sticky_session: None,
257                sticky_session_found: None,
258
259                method: None,
260                authority: None,
261                path: None,
262                status: None,
263                reason: None,
264                user_agent: None,
265            },
266        })
267    }
268
269    /// Reset the connection in case of keep-alive to be ready for the next request
270    pub fn reset(&mut self) {
271        trace!("{} ============== reset", log_context!(self));
272        let response_stream = match &mut self.response_stream {
273            ResponseStream::BackendAnswer(response_stream) => response_stream,
274            _ => return,
275        };
276
277        self.context.id = Ulid::generate();
278        self.context.reset();
279
280        self.request_stream.clear();
281        response_stream.clear();
282        self.keepalive_count += 1;
283        gauge_add!("http.active_requests", -1);
284
285        if let Some(backend) = &mut self.backend {
286            let mut backend = backend.borrow_mut();
287            backend.active_requests = backend.active_requests.saturating_sub(1);
288        }
289
290        // reset the front timeout and cancel the back timeout while we are
291        // waiting for a new request
292        self.container_backend_timeout.cancel();
293        self.container_frontend_timeout
294            .set_duration(self.configured_frontend_timeout);
295        self.frontend_readiness.interest = Ready::READABLE | Ready::HUP | Ready::ERROR;
296        self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
297
298        // We are resetting the offset of request and response stream buffers
299        // We do have to keep cursor position on the request, if there is data
300        // in the request stream to preserve http pipelining.
301
302        // Print the left-over response buffer output to track in which case it
303        // may happens
304        let response_storage = &mut response_stream.storage;
305        if !response_storage.is_empty() {
306            warn!(
307                "{} Leftover fragment from response: {}",
308                log_context!(self),
309                parser::view(
310                    response_storage.used(),
311                    16,
312                    &[response_storage.start, response_storage.end,],
313                )
314            );
315        }
316
317        response_storage.clear();
318        if !self.request_stream.storage.is_empty() {
319            self.frontend_readiness.event.insert(Ready::READABLE);
320        } else {
321            self.request_stream.storage.clear();
322        }
323    }
324
325    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
326        trace!("{} ============== readable", log_context!(self));
327        if !self.container_frontend_timeout.reset() {
328            error!(
329                "could not reset front timeout {:?}",
330                self.configured_frontend_timeout
331            );
332            self.print_state(self.protocol_string());
333        }
334
335        let response_stream = match &mut self.response_stream {
336            ResponseStream::BackendAnswer(response_stream) => response_stream,
337            ResponseStream::DefaultAnswer(..) => {
338                error!(
339                    "{} Sending default answer, should not read from frontend socket",
340                    log_context!(self)
341                );
342
343                self.frontend_readiness.interest.remove(Ready::READABLE);
344                self.frontend_readiness.interest.insert(Ready::WRITABLE);
345                return StateResult::Continue;
346            }
347        };
348
349        if self.request_stream.storage.is_full() {
350            self.frontend_readiness.interest.remove(Ready::READABLE);
351            if self.request_stream.is_main_phase() {
352                self.backend_readiness.interest.insert(Ready::WRITABLE);
353            } else {
354                // client has filled its buffer and we can't empty it
355                self.set_answer(DefaultAnswer::Answer413 {
356                    capacity: self.request_stream.storage.capacity(),
357                    phase: self.request_stream.parsing_phase.marker(),
358                    message: diagnostic_413_507(self.request_stream.parsing_phase),
359                });
360            }
361            return StateResult::Continue;
362        }
363
364        let (size, socket_state) = self
365            .frontend_socket
366            .socket_read(self.request_stream.storage.space());
367
368        debug!("{} Read {} bytes", log_context!(self), size);
369
370        if size > 0 {
371            self.request_stream.storage.fill(size);
372            count!("bytes_in", size as i64);
373            metrics.bin += size;
374            // if self.kawa_request.storage.is_full() {
375            //     self.frontend_readiness.interest.remove(Ready::READABLE);
376            // }
377        } else {
378            self.frontend_readiness.event.remove(Ready::READABLE);
379        }
380
381        match socket_state {
382            SocketResult::Error | SocketResult::Closed => {
383                if self.request_stream.is_initial() {
384                    // count an error if we were waiting for the first request
385                    // otherwise, if we already had one completed request and response,
386                    // and are waiting for the next one, we do not count a socket
387                    // closing abruptly as an error
388                    if self.keepalive_count == 0 {
389                        self.frontend_socket.read_error();
390                    }
391                } else {
392                    self.frontend_socket.read_error();
393                    self.log_request_error(
394                        metrics,
395                        &format!(
396                            "front socket {socket_state:?}, closing the session. Readiness: {:?} -> {:?}, read {size} bytes",
397                            self.frontend_readiness,
398                            self.backend_readiness,
399                        )
400                    );
401                }
402                return StateResult::CloseSession;
403            }
404            SocketResult::WouldBlock => {
405                self.frontend_readiness.event.remove(Ready::READABLE);
406            }
407            SocketResult::Continue => {}
408        };
409
410        trace!("{} ============== readable_parse", log_context!(self));
411        let was_initial = self.request_stream.is_initial();
412        let was_not_proxying = !self.request_stream.is_main_phase();
413
414        kawa::h1::parse(&mut self.request_stream, &mut self.context);
415        // kawa::debug_kawa(&self.request_stream);
416
417        if was_initial && !self.request_stream.is_initial() {
418            // if it was the first request, the front timeout duration
419            // was set to request_timeout, which is much lower. For future
420            // requests on this connection, we can wait a bit more
421            self.container_frontend_timeout
422                .set_duration(self.configured_frontend_timeout);
423            gauge_add!("http.active_requests", 1);
424            incr!("http.requests");
425        }
426
427        if let kawa::ParsingPhase::Error { marker, kind } = self.request_stream.parsing_phase {
428            incr!("http.frontend_parse_errors");
429            warn!(
430                "{} Parsing request error in {:?}: {}",
431                log_context!(self),
432                marker,
433                match kind {
434                    kawa::ParsingErrorKind::Consuming { index } => {
435                        let kawa = &self.request_stream;
436                        parser::view(
437                            kawa.storage.used(),
438                            16,
439                            &[
440                                kawa.storage.start,
441                                kawa.storage.head,
442                                index as usize,
443                                kawa.storage.end,
444                            ],
445                        )
446                    }
447                    kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
448                }
449            );
450            if response_stream.consumed {
451                self.log_request_error(metrics, "Parsing error on the request");
452                return StateResult::CloseSession;
453            } else {
454                let (message, successfully_parsed, partially_parsed, invalid) =
455                    diagnostic_400_502(marker, kind, &self.request_stream);
456                self.set_answer(DefaultAnswer::Answer400 {
457                    message,
458                    phase: marker,
459                    successfully_parsed,
460                    partially_parsed,
461                    invalid,
462                });
463                return StateResult::Continue;
464            }
465        }
466
467        if self.request_stream.is_main_phase() {
468            self.backend_readiness.interest.insert(Ready::WRITABLE);
469            if was_not_proxying {
470                // Sozu tries to connect only once all the headers were gathered and edited
471                // this could be improved
472                trace!("{} ============== HANDLE CONNECTION!", log_context!(self));
473                return StateResult::ConnectBackend;
474            }
475        }
476        if self.request_stream.is_terminated() {
477            self.frontend_readiness.interest.remove(Ready::READABLE);
478        }
479
480        StateResult::Continue
481    }
482
483    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
484        trace!("{} ============== writable", log_context!(self));
485        let response_stream = match &mut self.response_stream {
486            ResponseStream::BackendAnswer(response_stream) => response_stream,
487            _ => return self.writable_default_answer(metrics),
488        };
489
490        response_stream.prepare(&mut kawa::h1::BlockConverter);
491
492        let bufs = response_stream.as_io_slice();
493        if bufs.is_empty() && !self.frontend_socket.socket_wants_write() {
494            self.frontend_readiness.interest.remove(Ready::WRITABLE);
495            // do not shortcut, response might have been terminated without anything more to send
496        }
497
498        let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
499
500        debug!("{} Wrote {} bytes", log_context!(self), size);
501
502        if size > 0 {
503            response_stream.consume(size);
504            count!("bytes_out", size as i64);
505            metrics.bout += size;
506            self.backend_readiness.interest.insert(Ready::READABLE);
507        }
508
509        match socket_state {
510            SocketResult::Error | SocketResult::Closed => {
511                self.frontend_socket.write_error();
512                self.log_request_error(
513                    metrics,
514                    &format!(
515                        "front socket {socket_state:?}, closing session.  Readiness: {:?} -> {:?}, read {size} bytes",
516                        self.frontend_readiness,
517                        self.backend_readiness,
518                    ),
519                );
520                return StateResult::CloseSession;
521            }
522            SocketResult::WouldBlock => {
523                self.frontend_readiness.event.remove(Ready::WRITABLE);
524            }
525            SocketResult::Continue => {}
526        }
527
528        if self.frontend_socket.socket_wants_write() {
529            return StateResult::Continue;
530        }
531
532        if response_stream.is_terminated() && response_stream.is_completed() {
533            if self.context.closing {
534                debug!("{} closing proxy, no keep alive", log_context!(self));
535                self.log_request_success(metrics);
536                return StateResult::CloseSession;
537            }
538
539            match response_stream.detached.status_line {
540                kawa::StatusLine::Response { code: 101, .. } => {
541                    trace!("{} ============== HANDLE UPGRADE!", log_context!(self));
542                    self.log_request_success(metrics);
543                    return StateResult::Upgrade;
544                }
545                kawa::StatusLine::Response { code: 100, .. } => {
546                    trace!("{} ============== HANDLE CONTINUE!", log_context!(self));
547                    response_stream.clear();
548                    self.log_request_success(metrics);
549                    return StateResult::Continue;
550                }
551                kawa::StatusLine::Response { code: 103, .. } => {
552                    self.backend_readiness.event.insert(Ready::READABLE);
553                    trace!("{} ============== HANDLE EARLY HINT!", log_context!(self));
554                    response_stream.clear();
555                    self.log_request_success(metrics);
556                    return StateResult::Continue;
557                }
558                _ => (),
559            }
560
561            let response_length_known = response_stream.body_size != kawa::BodySize::Empty;
562            let request_length_known = self.request_stream.body_size != kawa::BodySize::Empty;
563            if !(self.request_stream.is_terminated() && self.request_stream.is_completed())
564                && request_length_known
565            {
566                error!(
567                    "{} Response terminated before request, this case is not handled properly yet",
568                    log_context!(self)
569                );
570                incr!("http.early_response_close");
571                // FIXME: this will cause problems with pipelining
572                // return StateResult::CloseSession;
573            }
574
575            // FIXME: we could get smarter about this
576            // with no keepalive on backend, we could open a new backend ConnectionError
577            // with no keepalive on front but keepalive on backend, we could have
578            // a pool of connections
579            trace!(
580                "{} ============== HANDLE KEEP-ALIVE: {} {} {}",
581                log_context!(self),
582                self.context.keep_alive_frontend,
583                self.context.keep_alive_backend,
584                response_length_known
585            );
586
587            self.log_request_success(metrics);
588            return match (
589                self.context.keep_alive_frontend,
590                self.context.keep_alive_backend,
591                response_length_known,
592            ) {
593                (true, true, true) => {
594                    debug!("{} Keep alive frontend/backend", log_context!(self));
595                    metrics.reset();
596                    self.reset();
597                    StateResult::Continue
598                }
599                (true, false, true) => {
600                    debug!("{} Keep alive frontend", log_context!(self));
601                    metrics.reset();
602                    self.reset();
603                    StateResult::CloseBackend
604                }
605                _ => {
606                    debug!("{} No keep alive", log_context!(self));
607                    StateResult::CloseSession
608                }
609            };
610        }
611        StateResult::Continue
612    }
613
614    fn writable_default_answer(&mut self, metrics: &mut SessionMetrics) -> StateResult {
615        trace!(
616            "{} ============== writable_default_answer",
617            log_context!(self)
618        );
619        let response_stream = match &mut self.response_stream {
620            ResponseStream::DefaultAnswer(_, response_stream) => response_stream,
621            _ => return StateResult::CloseSession,
622        };
623        let bufs = response_stream.as_io_slice();
624        let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
625
626        count!("bytes_out", size as i64);
627        metrics.bout += size;
628        response_stream.consume(size);
629
630        if size == 0 || socket_state != SocketResult::Continue {
631            self.frontend_readiness.event.remove(Ready::WRITABLE);
632        }
633
634        if response_stream.is_completed() {
635            save_http_status_metric(self.context.status, self.context.log_context());
636            self.log_default_answer_success(metrics);
637            self.frontend_readiness.reset();
638            self.backend_readiness.reset();
639            return StateResult::CloseSession;
640        }
641
642        if socket_state == SocketResult::Error {
643            self.frontend_socket.write_error();
644            self.log_request_error(
645                metrics,
646                "error writing default answer to front socket, closing",
647            );
648            StateResult::CloseSession
649        } else {
650            StateResult::Continue
651        }
652    }
653
654    pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
655        trace!("{} ============== backend_writable", log_context!(self));
656        if let ResponseStream::DefaultAnswer(..) = self.response_stream {
657            error!(
658                "{}\tsending default answer, should not write to back",
659                log_context!(self)
660            );
661            self.backend_readiness.interest.remove(Ready::WRITABLE);
662            self.frontend_readiness.interest.insert(Ready::WRITABLE);
663            return SessionResult::Continue;
664        }
665
666        let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
667            backend_socket
668        } else {
669            self.log_request_error(metrics, "back socket not found, closing session");
670            return SessionResult::Close;
671        };
672
673        self.request_stream.prepare(&mut kawa::h1::BlockConverter);
674
675        let bufs = self.request_stream.as_io_slice();
676        if bufs.is_empty() {
677            self.backend_readiness.interest.remove(Ready::WRITABLE);
678            return SessionResult::Continue;
679        }
680
681        let (size, socket_state) = backend_socket.socket_write_vectored(&bufs);
682        debug!("{} Wrote {} bytes", log_context!(self), size);
683
684        if size > 0 {
685            self.request_stream.consume(size);
686            count!("back_bytes_out", size as i64);
687            metrics.backend_bout += size;
688            self.frontend_readiness.interest.insert(Ready::READABLE);
689            self.backend_readiness.interest.insert(Ready::READABLE);
690        } else {
691            self.backend_readiness.event.remove(Ready::WRITABLE);
692        }
693
694        match socket_state {
695            // the back socket is not writable anymore, so we can drop
696            // the front buffer, no more data can be transmitted.
697            // But the socket might still be readable, or if it is
698            // closed, we might still have some data in the buffer.
699            // As an example, we can get an early response for a large
700            // POST request to refuse it and prevent all of the data
701            // from being transmitted, with the backend server closing
702            // the socket right after sending the response
703            // FIXME: shouldn't we test for response_state then?
704            SocketResult::Error | SocketResult::Closed => {
705                self.frontend_readiness.interest.remove(Ready::READABLE);
706                self.backend_readiness.interest.remove(Ready::WRITABLE);
707                return SessionResult::Continue;
708            }
709            SocketResult::WouldBlock => {
710                self.backend_readiness.event.remove(Ready::WRITABLE);
711            }
712            SocketResult::Continue => {}
713        }
714
715        if self.request_stream.is_terminated() && self.request_stream.is_completed() {
716            self.backend_readiness.interest.remove(Ready::WRITABLE);
717
718            // cancel the front timeout while we are waiting for the server to answer
719            self.container_frontend_timeout.cancel();
720            self.container_backend_timeout.reset();
721        }
722        SessionResult::Continue
723    }
724
725    // Read content from cluster
726    pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
727        trace!("{} ============== backend_readable", log_context!(self));
728        if !self.container_backend_timeout.reset() {
729            error!(
730                "{} Could not reset back timeout {:?}",
731                log_context!(self),
732                self.configured_backend_timeout
733            );
734            self.print_state(self.protocol_string());
735        }
736
737        let response_stream = match &mut self.response_stream {
738            ResponseStream::BackendAnswer(response_stream) => response_stream,
739            _ => {
740                error!(
741                    "{} Sending default answer, should not read from backend socket",
742                    log_context!(self),
743                );
744
745                self.backend_readiness.interest.remove(Ready::READABLE);
746                self.frontend_readiness.interest.insert(Ready::WRITABLE);
747                return SessionResult::Continue;
748            }
749        };
750
751        let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
752            backend_socket
753        } else {
754            self.log_request_error(metrics, "back socket not found, closing session");
755            return SessionResult::Close;
756        };
757
758        if response_stream.storage.is_full() {
759            self.backend_readiness.interest.remove(Ready::READABLE);
760            if response_stream.is_main_phase() {
761                self.frontend_readiness.interest.insert(Ready::WRITABLE);
762            } else {
763                // server has filled its buffer and we can't empty it
764                let capacity = response_stream.storage.capacity();
765                let phase = response_stream.parsing_phase.marker();
766                let message = diagnostic_413_507(response_stream.parsing_phase);
767                self.set_answer(DefaultAnswer::Answer507 {
768                    capacity,
769                    phase,
770                    message,
771                });
772            }
773            return SessionResult::Continue;
774        }
775
776        let (size, socket_state) = backend_socket.socket_read(response_stream.storage.space());
777        debug!("{} Read {} bytes", log_context!(self), size);
778
779        if size > 0 {
780            response_stream.storage.fill(size);
781            count!("back_bytes_in", size as i64);
782            metrics.backend_bin += size;
783            // if self.kawa_response.storage.is_full() {
784            //     self.backend_readiness.interest.remove(Ready::READABLE);
785            // }
786
787            // In case the response starts while the request is not tagged as "terminated",
788            // we place the timeout responsibility on the backend.
789            // This can happen when:
790            // - the request is malformed and doesn't have length information
791            // - kawa fails to detect a properly terminated request (e.g. a GET request with no body and no length)
792            // - the response can start before the end of the request (e.g. stream processing like compression)
793            self.container_frontend_timeout.cancel();
794        } else {
795            self.backend_readiness.event.remove(Ready::READABLE);
796        }
797
798        // TODO: close delimited and backend_hup should be handled better
799        match socket_state {
800            SocketResult::Error => {
801                backend_socket.read_error();
802                self.log_request_error(
803                    metrics,
804                    &format!(
805                        "back socket {socket_state:?}, closing session.  Readiness: {:?} -> {:?}, read {size} bytes",
806                        self.frontend_readiness,
807                        self.backend_readiness,
808                    ),
809                );
810                return SessionResult::Close;
811            }
812            SocketResult::WouldBlock | SocketResult::Closed => {
813                self.backend_readiness.event.remove(Ready::READABLE);
814            }
815            SocketResult::Continue => {}
816        }
817
818        trace!(
819            "{} ============== backend_readable_parse",
820            log_context!(self)
821        );
822        kawa::h1::parse(response_stream, &mut self.context);
823        // kawa::debug_kawa(&self.response_stream);
824
825        if let kawa::ParsingPhase::Error { marker, kind } = response_stream.parsing_phase {
826            incr!("http.backend_parse_errors");
827            warn!(
828                "{} Parsing response error in {:?}: {}",
829                log_context!(self),
830                marker,
831                match kind {
832                    kawa::ParsingErrorKind::Consuming { index } => {
833                        parser::view(
834                            response_stream.storage.used(),
835                            16,
836                            &[
837                                response_stream.storage.start,
838                                response_stream.storage.head,
839                                index as usize,
840                                response_stream.storage.end,
841                            ],
842                        )
843                    }
844                    kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
845                }
846            );
847            if response_stream.consumed {
848                return SessionResult::Close;
849            } else {
850                let (message, successfully_parsed, partially_parsed, invalid) =
851                    diagnostic_400_502(marker, kind, response_stream);
852                self.set_answer(DefaultAnswer::Answer502 {
853                    message,
854                    phase: marker,
855                    successfully_parsed,
856                    partially_parsed,
857                    invalid,
858                });
859                return SessionResult::Continue;
860            }
861        }
862
863        if response_stream.is_main_phase() {
864            self.frontend_readiness.interest.insert(Ready::WRITABLE);
865        }
866        if response_stream.is_terminated() {
867            metrics.backend_stop();
868            self.backend_stop = Some(Instant::now());
869            self.backend_readiness.interest.remove(Ready::READABLE);
870        }
871        SessionResult::Continue
872    }
873}
874
875impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
876    fn log_endpoint(&self) -> EndpointRecord {
877        EndpointRecord::Http {
878            method: self.context.method.as_deref(),
879            authority: self.context.authority.as_deref(),
880            path: self.context.path.as_deref(),
881            reason: self.context.reason.as_deref(),
882            status: self.context.status,
883        }
884    }
885
886    pub fn get_session_address(&self) -> Option<SocketAddr> {
887        self.context
888            .session_address
889            .or_else(|| self.frontend_socket.socket_ref().peer_addr().ok())
890    }
891
892    pub fn get_backend_address(&self) -> Option<SocketAddr> {
893        self.backend
894            .as_ref()
895            .map(|backend| backend.borrow().address)
896            .or_else(|| {
897                self.backend_socket
898                    .as_ref()
899                    .and_then(|backend| backend.peer_addr().ok())
900            })
901    }
902
903    // The protocol name used in the access logs
904    fn protocol_string(&self) -> &'static str {
905        match self.context.protocol {
906            Protocol::HTTP => "HTTP",
907            Protocol::HTTPS => match self.frontend_socket.protocol() {
908                TransportProtocol::Ssl2 => "HTTPS-SSL2",
909                TransportProtocol::Ssl3 => "HTTPS-SSL3",
910                TransportProtocol::Tls1_0 => "HTTPS-TLS1.0",
911                TransportProtocol::Tls1_1 => "HTTPS-TLS1.1",
912                TransportProtocol::Tls1_2 => "HTTPS-TLS1.2",
913                TransportProtocol::Tls1_3 => "HTTPS-TLS1.3",
914                _ => unreachable!(),
915            },
916            _ => unreachable!(),
917        }
918    }
919
920    /// Format the context of the websocket into a loggable String
921    pub fn websocket_context(&self) -> WebSocketContext {
922        WebSocketContext::Http {
923            method: self.context.method.clone(),
924            authority: self.context.authority.clone(),
925            path: self.context.path.clone(),
926            reason: self.context.reason.clone(),
927            status: self.context.status,
928        }
929    }
930
931    pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
932        let listener = self.listener.borrow();
933        let tags = self.context.authority.as_ref().and_then(|host| {
934            let hostname = match host.split_once(':') {
935                None => host,
936                Some((hostname, _)) => hostname,
937            };
938            listener.get_tags(hostname)
939        });
940
941        let context = self.context.log_context();
942        metrics.register_end_of_session(&context);
943
944        log_access! {
945            error,
946            on_failure: { incr!("unsent-access-logs") },
947            message: message,
948            context,
949            session_address: self.get_session_address(),
950            backend_address: self.get_backend_address(),
951            protocol: self.protocol_string(),
952            endpoint: self.log_endpoint(),
953            tags,
954            client_rtt: socket_rtt(self.front_socket()),
955            server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
956            service_time: metrics.service_time(),
957            response_time: metrics.backend_response_time(),
958            request_time: metrics.request_time(),
959            bytes_in: metrics.bin,
960            bytes_out: metrics.bout,
961            user_agent: self.context.user_agent.as_deref(),
962        };
963    }
964
965    pub fn log_request_success(&self, metrics: &SessionMetrics) {
966        save_http_status_metric(self.context.status, self.context.log_context());
967        self.log_request(metrics, false, None);
968    }
969
970    pub fn log_default_answer_success(&self, metrics: &SessionMetrics) {
971        self.log_request(metrics, false, None);
972    }
973    pub fn log_request_error(&self, metrics: &mut SessionMetrics, message: &str) {
974        incr!("http.errors");
975        error!(
976            "{} Could not process request properly got: {}",
977            log_context!(self),
978            message
979        );
980        self.print_state(self.protocol_string());
981        self.log_request(metrics, true, Some(message));
982    }
983
984    pub fn set_answer(&mut self, answer: DefaultAnswer) {
985        let status = u16::from(&answer);
986        if let ResponseStream::DefaultAnswer(old_status, ..) = self.response_stream {
987            error!(
988                "already set the default answer to {}, trying to set to {}",
989                old_status, status
990            );
991        } else {
992            match answer {
993                DefaultAnswer::Answer301 { .. } => incr!(
994                    "http.301.redirection",
995                    self.context.cluster_id.as_deref(),
996                    self.context.backend_id.as_deref()
997                ),
998                DefaultAnswer::Answer400 { .. } => incr!("http.400.errors"),
999                DefaultAnswer::Answer401 { .. } => incr!(
1000                    "http.401.errors",
1001                    self.context.cluster_id.as_deref(),
1002                    self.context.backend_id.as_deref()
1003                ),
1004                DefaultAnswer::Answer404 { .. } => incr!("http.404.errors"),
1005                DefaultAnswer::Answer408 { .. } => incr!(
1006                    "http.408.errors",
1007                    self.context.cluster_id.as_deref(),
1008                    self.context.backend_id.as_deref()
1009                ),
1010                DefaultAnswer::Answer413 { .. } => incr!(
1011                    "http.413.errors",
1012                    self.context.cluster_id.as_deref(),
1013                    self.context.backend_id.as_deref()
1014                ),
1015                DefaultAnswer::Answer502 { .. } => incr!(
1016                    "http.502.errors",
1017                    self.context.cluster_id.as_deref(),
1018                    self.context.backend_id.as_deref()
1019                ),
1020                DefaultAnswer::Answer503 { .. } => incr!(
1021                    "http.503.errors",
1022                    self.context.cluster_id.as_deref(),
1023                    self.context.backend_id.as_deref()
1024                ),
1025                DefaultAnswer::Answer504 { .. } => incr!(
1026                    "http.504.errors",
1027                    self.context.cluster_id.as_deref(),
1028                    self.context.backend_id.as_deref()
1029                ),
1030                DefaultAnswer::Answer507 { .. } => incr!(
1031                    "http.507.errors",
1032                    self.context.cluster_id.as_deref(),
1033                    self.context.backend_id.as_deref()
1034                ),
1035            };
1036        }
1037
1038        let mut kawa = self.answers.borrow().get(
1039            answer,
1040            self.context.id.to_string(),
1041            self.context.cluster_id.as_deref(),
1042            self.context.backend_id.as_deref(),
1043            self.get_route(),
1044        );
1045        kawa.prepare(&mut kawa::h1::BlockConverter);
1046        self.context.status = Some(status);
1047        self.context.reason = None;
1048        self.context.keep_alive_frontend = false;
1049        self.response_stream = ResponseStream::DefaultAnswer(status, kawa);
1050        self.frontend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1051        self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
1052    }
1053
1054    pub fn test_backend_socket(&self) -> bool {
1055        match self.backend_socket {
1056            Some(ref s) => {
1057                let mut tmp = [0u8; 1];
1058                let res = s.peek(&mut tmp[..]);
1059
1060                match res {
1061                    // if the socket is half open, it will report 0 bytes read (EOF)
1062                    Ok(0) => false,
1063                    Ok(_) => true,
1064                    Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
1065                }
1066            }
1067            None => false,
1068        }
1069    }
1070
1071    pub fn is_valid_backend_socket(&self) -> bool {
1072        // if socket was last used in the last second, test it
1073        match self.backend_stop.as_ref() {
1074            Some(stop_instant) => {
1075                let now = Instant::now();
1076                let dur = now - *stop_instant;
1077                if dur > Duration::from_secs(1) {
1078                    return self.test_backend_socket();
1079                }
1080            }
1081            None => return self.test_backend_socket(),
1082        }
1083
1084        true
1085    }
1086
1087    pub fn set_backend_socket(&mut self, socket: TcpStream, backend: Option<Rc<RefCell<Backend>>>) {
1088        self.backend_socket = Some(socket);
1089        self.backend = backend;
1090    }
1091
1092    pub fn set_cluster_id(&mut self, cluster_id: String) {
1093        self.context.cluster_id = Some(cluster_id);
1094    }
1095
1096    pub fn set_backend_id(&mut self, backend_id: String) {
1097        self.context.backend_id = Some(backend_id);
1098    }
1099
1100    pub fn set_backend_token(&mut self, token: Token) {
1101        self.backend_token = Some(token);
1102    }
1103
1104    pub fn clear_backend_token(&mut self) {
1105        self.backend_token = None;
1106    }
1107
1108    pub fn set_backend_timeout(&mut self, dur: Duration) {
1109        if let Some(token) = self.backend_token.as_ref() {
1110            self.container_backend_timeout.set_duration(dur);
1111            self.container_backend_timeout.set(*token);
1112        }
1113    }
1114
1115    pub fn front_socket(&self) -> &TcpStream {
1116        self.frontend_socket.socket_ref()
1117    }
1118
1119    /// WARNING: this function removes the backend entry in the session manager
1120    /// IF the backend_token is set, so that entry can be reused for new backend.
1121    /// I don't think this is a good idea, but it is a quick fix
1122    fn close_backend(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1123        self.container_backend_timeout.cancel();
1124        debug!(
1125            "{}\tPROXY [{}->{}] CLOSED BACKEND",
1126            log_context!(self),
1127            self.frontend_token.0,
1128            self.backend_token
1129                .map(|t| format!("{}", t.0))
1130                .unwrap_or_else(|| "-".to_string())
1131        );
1132
1133        let proxy = proxy.borrow();
1134        if let Some(socket) = &mut self.backend_socket.take() {
1135            if let Err(e) = proxy.deregister_socket(socket) {
1136                error!(
1137                    "{} Error deregistering back socket({:?}): {:?}",
1138                    log_context!(self),
1139                    socket,
1140                    e
1141                );
1142            }
1143            if let Err(e) = socket.shutdown(Shutdown::Both) {
1144                if e.kind() != ErrorKind::NotConnected {
1145                    error!(
1146                        "{} Error shutting down back socket({:?}): {:?}",
1147                        log_context!(self),
1148                        socket,
1149                        e
1150                    );
1151                }
1152            }
1153        }
1154
1155        if let Some(token) = self.backend_token.take() {
1156            proxy.remove_session(token);
1157
1158            if self.backend_connection_status != BackendConnectionStatus::NotConnected {
1159                self.backend_readiness.event = Ready::EMPTY;
1160            }
1161
1162            if self.backend_connection_status == BackendConnectionStatus::Connected {
1163                gauge_add!("backend.connections", -1);
1164                gauge_add!(
1165                    "connections_per_backend",
1166                    -1,
1167                    self.context.cluster_id.as_deref(),
1168                    metrics.backend_id.as_deref()
1169                );
1170            }
1171
1172            self.set_backend_connected(BackendConnectionStatus::NotConnected, metrics);
1173
1174            if let Some(backend) = self.backend.take() {
1175                backend.borrow_mut().dec_connections();
1176            }
1177        }
1178    }
1179
1180    /// Check the number of connection attempts against authorized connection retries
1181    fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> {
1182        if self.connection_attempts >= CONN_RETRIES {
1183            error!(
1184                "{} Max connection attempt reached ({})",
1185                log_context!(self),
1186                self.connection_attempts,
1187            );
1188
1189            self.set_answer(DefaultAnswer::Answer503 {
1190                message: format!(
1191                    "Max connection attempt reached: {}",
1192                    self.connection_attempts
1193                ),
1194            });
1195            return Err(BackendConnectionError::MaxConnectionRetries(None));
1196        }
1197        Ok(())
1198    }
1199
1200    fn check_backend_connection(&mut self, metrics: &mut SessionMetrics) -> bool {
1201        let is_valid_backend_socket = self.is_valid_backend_socket();
1202
1203        if !is_valid_backend_socket {
1204            return false;
1205        }
1206
1207        //matched on keepalive
1208        metrics.backend_id = self.backend.as_ref().map(|i| i.borrow().backend_id.clone());
1209
1210        metrics.backend_start();
1211        if let Some(b) = self.backend.as_mut() {
1212            b.borrow_mut().active_requests += 1;
1213        }
1214        true
1215    }
1216
1217    // -> host, path, method
1218    pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1219        let given_method = self
1220            .context
1221            .method
1222            .as_ref()
1223            .ok_or(RetrieveClusterError::NoMethod)?;
1224        let given_authority = self
1225            .context
1226            .authority
1227            .as_deref()
1228            .ok_or(RetrieveClusterError::NoHost)?;
1229        let given_path = self
1230            .context
1231            .path
1232            .as_deref()
1233            .ok_or(RetrieveClusterError::NoPath)?;
1234
1235        Ok((given_authority, given_path, given_method))
1236    }
1237
1238    pub fn get_route(&self) -> String {
1239        if let Some(method) = &self.context.method {
1240            if let Some(authority) = &self.context.authority {
1241                if let Some(path) = &self.context.path {
1242                    return format!("{method} {authority}{path}");
1243                }
1244                return format!("{method} {authority}");
1245            }
1246            return format!("{method}");
1247        }
1248        String::new()
1249    }
1250
1251    fn cluster_id_from_request(
1252        &mut self,
1253        proxy: Rc<RefCell<dyn L7Proxy>>,
1254    ) -> Result<String, RetrieveClusterError> {
1255        let (host, uri, method) = match self.extract_route() {
1256            Ok(tuple) => tuple,
1257            Err(cluster_error) => {
1258                self.set_answer(DefaultAnswer::Answer400 {
1259                    message: "Could not extract the route after connection started, this should not happen.".into(),
1260                    phase: self.request_stream.parsing_phase.marker(),
1261                    successfully_parsed: "null".into(),
1262                    partially_parsed: "null".into(),
1263                    invalid: "null".into(),
1264                });
1265                return Err(cluster_error);
1266            }
1267        };
1268
1269        let route_result = self
1270            .listener
1271            .borrow()
1272            .frontend_from_request(host, uri, method);
1273
1274        let route = match route_result {
1275            Ok(route) => route,
1276            Err(frontend_error) => {
1277                self.set_answer(DefaultAnswer::Answer404 {});
1278                return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
1279            }
1280        };
1281
1282        let cluster_id = match route {
1283            Route::ClusterId(cluster_id) => cluster_id,
1284            Route::Deny => {
1285                self.set_answer(DefaultAnswer::Answer401 {});
1286                return Err(RetrieveClusterError::UnauthorizedRoute);
1287            }
1288        };
1289
1290        let frontend_should_redirect_https = matches!(proxy.borrow().kind(), ListenerType::Http)
1291            && proxy
1292                .borrow()
1293                .clusters()
1294                .get(&cluster_id)
1295                .map(|cluster| cluster.https_redirect)
1296                .unwrap_or(false);
1297
1298        if frontend_should_redirect_https {
1299            self.set_answer(DefaultAnswer::Answer301 {
1300                location: format!("https://{host}{uri}"),
1301            });
1302            return Err(RetrieveClusterError::UnauthorizedRoute);
1303        }
1304
1305        Ok(cluster_id)
1306    }
1307
1308    pub fn backend_from_request(
1309        &mut self,
1310        cluster_id: &str,
1311        frontend_should_stick: bool,
1312        proxy: Rc<RefCell<dyn L7Proxy>>,
1313        metrics: &mut SessionMetrics,
1314    ) -> Result<TcpStream, BackendConnectionError> {
1315        let (backend, conn) = self
1316            .get_backend_for_sticky_session(
1317                frontend_should_stick,
1318                self.context.sticky_session_found.as_deref(),
1319                cluster_id,
1320                proxy,
1321            )
1322            .map_err(|backend_error| {
1323                // some backend errors are actually retryable
1324                // TODO: maybe retry or return a different default answer
1325                self.set_answer(DefaultAnswer::Answer503 {
1326                    message: backend_error.to_string(),
1327                });
1328                BackendConnectionError::Backend(backend_error)
1329            })?;
1330
1331        if frontend_should_stick {
1332            // update sticky name in case it changed I guess?
1333            self.context.sticky_name = self.listener.borrow().get_sticky_name().to_string();
1334
1335            self.context.sticky_session = Some(
1336                backend
1337                    .borrow()
1338                    .sticky_id
1339                    .clone()
1340                    .unwrap_or_else(|| backend.borrow().backend_id.clone()),
1341            );
1342        }
1343
1344        metrics.backend_id = Some(backend.borrow().backend_id.clone());
1345        metrics.backend_start();
1346        self.set_backend_id(backend.borrow().backend_id.clone());
1347
1348        self.backend = Some(backend);
1349        Ok(conn)
1350    }
1351
1352    fn get_backend_for_sticky_session(
1353        &self,
1354        frontend_should_stick: bool,
1355        sticky_session: Option<&str>,
1356        cluster_id: &str,
1357        proxy: Rc<RefCell<dyn L7Proxy>>,
1358    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
1359        match (frontend_should_stick, sticky_session) {
1360            (true, Some(sticky_session)) => proxy
1361                .borrow()
1362                .backends()
1363                .borrow_mut()
1364                .backend_from_sticky_session(cluster_id, sticky_session),
1365            _ => proxy
1366                .borrow()
1367                .backends()
1368                .borrow_mut()
1369                .backend_from_cluster_id(cluster_id),
1370        }
1371    }
1372
1373    fn connect_to_backend(
1374        &mut self,
1375        session_rc: Rc<RefCell<dyn ProxySession>>,
1376        proxy: Rc<RefCell<dyn L7Proxy>>,
1377        metrics: &mut SessionMetrics,
1378    ) -> Result<BackendConnectAction, BackendConnectionError> {
1379        let old_cluster_id = self.context.cluster_id.clone();
1380        let old_backend_token = self.backend_token;
1381
1382        self.check_circuit_breaker()?;
1383
1384        let cluster_id = self
1385            .cluster_id_from_request(proxy.clone())
1386            .map_err(BackendConnectionError::RetrieveClusterError)?;
1387
1388        trace!(
1389            "{} Connect_to_backend: {:?} {:?} {:?}",
1390            log_context!(self),
1391            self.context.cluster_id,
1392            cluster_id,
1393            self.backend_connection_status
1394        );
1395        // check if we can reuse the backend connection
1396        if (self.context.cluster_id.as_ref()) == Some(&cluster_id)
1397            && self.backend_connection_status == BackendConnectionStatus::Connected
1398        {
1399            let has_backend = self
1400                .backend
1401                .as_ref()
1402                .map(|backend| {
1403                    let backend = backend.borrow();
1404                    proxy
1405                        .borrow()
1406                        .backends()
1407                        .borrow()
1408                        .has_backend(&cluster_id, &backend)
1409                })
1410                .unwrap_or(false);
1411
1412            if has_backend && self.check_backend_connection(metrics) {
1413                return Ok(BackendConnectAction::Reuse);
1414            } else if self.backend_token.take().is_some() {
1415                self.close_backend(proxy.clone(), metrics);
1416            }
1417        }
1418
1419        //replacing with a connection to another cluster
1420        if old_cluster_id.is_some()
1421            && old_cluster_id.as_ref() != Some(&cluster_id)
1422            && self.backend_token.take().is_some()
1423        {
1424            self.close_backend(proxy.clone(), metrics);
1425        }
1426
1427        self.context.cluster_id = Some(cluster_id.clone());
1428
1429        let frontend_should_stick = proxy
1430            .borrow()
1431            .clusters()
1432            .get(&cluster_id)
1433            .map(|cluster| cluster.sticky_session)
1434            .unwrap_or(false);
1435
1436        let mut socket =
1437            self.backend_from_request(&cluster_id, frontend_should_stick, proxy.clone(), metrics)?;
1438        if let Err(e) = socket.set_nodelay(true) {
1439            error!(
1440                "{} Error setting nodelay on backend socket({:?}): {:?}",
1441                log_context!(self),
1442                socket,
1443                e
1444            );
1445        }
1446
1447        self.backend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1448        self.backend_connection_status = BackendConnectionStatus::Connecting(Instant::now());
1449
1450        match old_backend_token {
1451            Some(backend_token) => {
1452                self.set_backend_token(backend_token);
1453                if let Err(e) = proxy.borrow().register_socket(
1454                    &mut socket,
1455                    backend_token,
1456                    Interest::READABLE | Interest::WRITABLE,
1457                ) {
1458                    error!(
1459                        "{} Error registering back socket({:?}): {:?}",
1460                        log_context!(self),
1461                        socket,
1462                        e
1463                    );
1464                }
1465
1466                self.set_backend_socket(socket, self.backend.clone());
1467                self.set_backend_timeout(self.configured_connect_timeout);
1468
1469                Ok(BackendConnectAction::Replace)
1470            }
1471            None => {
1472                let backend_token = proxy.borrow().add_session(session_rc);
1473
1474                if let Err(e) = proxy.borrow().register_socket(
1475                    &mut socket,
1476                    backend_token,
1477                    Interest::READABLE | Interest::WRITABLE,
1478                ) {
1479                    error!(
1480                        "{} Error registering back socket({:?}): {:?}",
1481                        log_context!(self),
1482                        socket,
1483                        e
1484                    );
1485                }
1486
1487                self.set_backend_socket(socket, self.backend.clone());
1488                self.set_backend_token(backend_token);
1489                self.set_backend_timeout(self.configured_connect_timeout);
1490
1491                Ok(BackendConnectAction::New)
1492            }
1493        }
1494    }
1495
1496    fn set_backend_connected(
1497        &mut self,
1498        connected: BackendConnectionStatus,
1499        metrics: &mut SessionMetrics,
1500    ) {
1501        let last = self.backend_connection_status;
1502        self.backend_connection_status = connected;
1503
1504        if connected == BackendConnectionStatus::Connected {
1505            gauge_add!("backend.connections", 1);
1506            gauge_add!(
1507                "connections_per_backend",
1508                1,
1509                self.context.cluster_id.as_deref(),
1510                metrics.backend_id.as_deref()
1511            );
1512
1513            // the back timeout was of connect_timeout duration before,
1514            // now that we're connected, move to backend_timeout duration
1515            self.set_backend_timeout(self.configured_backend_timeout);
1516            // if we are not waiting for the backend response, its timeout is concelled
1517            // it should be set when the request has been entirely transmitted
1518            if !self.backend_readiness.interest.is_readable() {
1519                self.container_backend_timeout.cancel();
1520            }
1521
1522            if let Some(backend) = &self.backend {
1523                let mut backend = backend.borrow_mut();
1524
1525                if backend.retry_policy.is_down() {
1526                    incr!(
1527                        "backend.up",
1528                        self.context.cluster_id.as_deref(),
1529                        metrics.backend_id.as_deref()
1530                    );
1531
1532                    info!(
1533                        "backend server {} at {} is up",
1534                        backend.backend_id, backend.address
1535                    );
1536
1537                    push_event(Event {
1538                        kind: EventKind::BackendUp as i32,
1539                        backend_id: Some(backend.backend_id.to_owned()),
1540                        address: Some(backend.address.into()),
1541                        cluster_id: None,
1542                    });
1543                }
1544
1545                if let BackendConnectionStatus::Connecting(start) = last {
1546                    backend.set_connection_time(Instant::now() - start);
1547                }
1548
1549                //successful connection, reset failure counter
1550                backend.failures = 0;
1551                backend.active_requests += 1;
1552                backend.retry_policy.succeed();
1553            }
1554        }
1555    }
1556
1557    fn fail_backend_connection(&mut self, metrics: &SessionMetrics) {
1558        if let Some(backend) = &self.backend {
1559            let mut backend = backend.borrow_mut();
1560            backend.failures += 1;
1561
1562            let already_unavailable = backend.retry_policy.is_down();
1563            backend.retry_policy.fail();
1564            incr!(
1565                "backend.connections.error",
1566                self.context.cluster_id.as_deref(),
1567                metrics.backend_id.as_deref()
1568            );
1569
1570            if !already_unavailable && backend.retry_policy.is_down() {
1571                error!(
1572                    "{} backend server {} at {} is down",
1573                    log_context!(self),
1574                    backend.backend_id,
1575                    backend.address
1576                );
1577
1578                incr!(
1579                    "backend.down",
1580                    self.context.cluster_id.as_deref(),
1581                    metrics.backend_id.as_deref()
1582                );
1583
1584                push_event(Event {
1585                    kind: EventKind::BackendDown as i32,
1586                    backend_id: Some(backend.backend_id.to_owned()),
1587                    address: Some(backend.address.into()),
1588                    cluster_id: None,
1589                });
1590            }
1591        }
1592    }
1593
1594    pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> StateResult {
1595        let response_stream = match &mut self.response_stream {
1596            ResponseStream::BackendAnswer(response_stream) => response_stream,
1597            _ => return StateResult::CloseBackend,
1598        };
1599
1600        // there might still data we can read on the socket
1601        if self.backend_readiness.event.is_readable()
1602            && self.backend_readiness.interest.is_readable()
1603        {
1604            return StateResult::Continue;
1605        }
1606
1607        // the backend finished to answer we can close
1608        if response_stream.is_terminated() {
1609            return StateResult::CloseBackend;
1610        }
1611        match (
1612            self.request_stream.is_initial(),
1613            response_stream.is_initial(),
1614        ) {
1615            // backend stopped before response is finished,
1616            // or maybe it was malformed in the first place (no Content-Length)
1617            (_, false) => {
1618                error!(
1619                    "{} Backend closed before session is over",
1620                    log_context!(self),
1621                );
1622
1623                trace!("{} Backend hang-up, setting the parsing phase of the response stream to terminated, this also takes care of responses that lack length information.", log_context!(self));
1624
1625                response_stream.parsing_phase = kawa::ParsingPhase::Terminated;
1626
1627                // writable() will be called again and finish the session properly
1628                // for this reason, writable must not short cut
1629                self.frontend_readiness.interest.insert(Ready::WRITABLE);
1630                StateResult::Continue
1631            }
1632            // probably backend hup between keep alive request, change backend
1633            (true, true) => {
1634                trace!(
1635                    "{} Backend hanged up in between requests",
1636                    log_context!(self)
1637                );
1638                StateResult::CloseBackend
1639            }
1640            // the frontend already transmitted data so we can't redirect
1641            (false, true) => {
1642                error!(
1643                    "{}  Frontend transmitted data but the back closed",
1644                    log_context!(self)
1645                );
1646
1647                self.set_answer(DefaultAnswer::Answer503 {
1648                    message: "Backend closed after consuming part of the request".into(),
1649                });
1650
1651                self.backend_readiness.interest = Ready::EMPTY;
1652                StateResult::Continue
1653            }
1654        }
1655    }
1656
1657    /// The main session loop, processing all events triggered by mio since last time
1658    /// and proxying http traffic. The main flow can be summed up by:
1659    ///
1660    /// - if connecting an back has event:
1661    ///   - if backend hanged up, try again
1662    ///   - else, set as connected
1663    /// - while front or back has event:
1664    ///   - read request on front
1665    ///   - write request to back
1666    ///   - read response on back
1667    ///   - write response to front
1668    fn ready_inner(
1669        &mut self,
1670        session: Rc<RefCell<dyn crate::ProxySession>>,
1671        proxy: Rc<RefCell<dyn L7Proxy>>,
1672        metrics: &mut SessionMetrics,
1673    ) -> SessionResult {
1674        let mut counter = 0;
1675
1676        if self.backend_connection_status.is_connecting()
1677            && !self.backend_readiness.event.is_empty()
1678        {
1679            if self.backend_readiness.event.is_hup() && !self.test_backend_socket() {
1680                //retry connecting the backend
1681                error!(
1682                    "{} Error connecting to backend, trying again, attempt {}",
1683                    log_context!(self),
1684                    self.connection_attempts
1685                );
1686
1687                self.connection_attempts += 1;
1688                self.fail_backend_connection(metrics);
1689
1690                self.backend_connection_status =
1691                    BackendConnectionStatus::Connecting(Instant::now());
1692
1693                // trigger a backend reconnection
1694                self.close_backend(proxy.clone(), metrics);
1695
1696                let connection_result =
1697                    self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1698                if let Err(err) = &connection_result {
1699                    error!(
1700                        "{} Error connecting to backend: {}",
1701                        log_context!(self),
1702                        err
1703                    );
1704                }
1705
1706                if let Some(session_result) = handle_connection_result(connection_result) {
1707                    return session_result;
1708                }
1709            } else {
1710                metrics.backend_connected();
1711                self.connection_attempts = 0;
1712                self.set_backend_connected(BackendConnectionStatus::Connected, metrics);
1713                // we might get an early response from the backend, so we want to look
1714                // at readable events
1715                self.backend_readiness.interest.insert(Ready::READABLE);
1716            }
1717        }
1718
1719        if self.frontend_readiness.event.is_hup() {
1720            if !self.request_stream.is_initial() {
1721                self.log_request_error(metrics, "Client disconnected abruptly");
1722            }
1723            return SessionResult::Close;
1724        }
1725
1726        while counter < MAX_LOOP_ITERATIONS {
1727            let frontend_interest = self.frontend_readiness.filter_interest();
1728            let backend_interest = self.backend_readiness.filter_interest();
1729
1730            trace!(
1731                "{} Frontend interest({:?}) and backend interest({:?})",
1732                log_context!(self),
1733                frontend_interest,
1734                backend_interest,
1735            );
1736
1737            if frontend_interest.is_empty() && backend_interest.is_empty() {
1738                break;
1739            }
1740
1741            if self.backend_readiness.event.is_hup()
1742                && self.frontend_readiness.interest.is_writable()
1743                && !self.frontend_readiness.event.is_writable()
1744            {
1745                break;
1746            }
1747
1748            if frontend_interest.is_readable() {
1749                let state_result = self.readable(metrics);
1750                trace!(
1751                    "{} frontend_readable: {:?}",
1752                    log_context!(self),
1753                    state_result
1754                );
1755
1756                match state_result {
1757                    StateResult::Continue => {}
1758                    StateResult::ConnectBackend => {
1759                        let connection_result =
1760                            self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1761                        if let Err(err) = &connection_result {
1762                            error!(
1763                                "{} Error connecting to backend: {}",
1764                                log_context!(self),
1765                                err
1766                            );
1767                        }
1768
1769                        if let Some(session_result) = handle_connection_result(connection_result) {
1770                            return session_result;
1771                        }
1772                    }
1773                    StateResult::CloseBackend => unreachable!(),
1774                    StateResult::CloseSession => return SessionResult::Close,
1775                    StateResult::Upgrade => return SessionResult::Upgrade,
1776                }
1777            }
1778
1779            if backend_interest.is_writable() {
1780                let session_result = self.backend_writable(metrics);
1781                trace!(
1782                    "{} backend_writable: {:?}",
1783                    log_context!(self),
1784                    session_result
1785                );
1786                if session_result != SessionResult::Continue {
1787                    return session_result;
1788                }
1789            }
1790
1791            if backend_interest.is_readable() {
1792                let session_result = self.backend_readable(metrics);
1793                trace!(
1794                    "{} backend_readable: {:?}",
1795                    log_context!(self),
1796                    session_result
1797                );
1798                if session_result != SessionResult::Continue {
1799                    return session_result;
1800                }
1801            }
1802
1803            if frontend_interest.is_writable() {
1804                let state_result = self.writable(metrics);
1805                trace!(
1806                    "{} frontend_writable: {:?}",
1807                    log_context!(self),
1808                    state_result
1809                );
1810                match state_result {
1811                    StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
1812                    StateResult::CloseSession => return SessionResult::Close,
1813                    StateResult::Upgrade => return SessionResult::Upgrade,
1814                    StateResult::Continue => {}
1815                    StateResult::ConnectBackend => unreachable!(),
1816                }
1817            }
1818
1819            if frontend_interest.is_error() {
1820                error!(
1821                    "{} frontend socket error, disconnecting",
1822                    log_context!(self)
1823                );
1824
1825                return SessionResult::Close;
1826            }
1827
1828            if backend_interest.is_hup() || backend_interest.is_error() {
1829                let state_result = self.backend_hup(metrics);
1830
1831                trace!("{} backend_hup: {:?}", log_context!(self), state_result);
1832                match state_result {
1833                    StateResult::Continue => {}
1834                    StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
1835                    StateResult::CloseSession => return SessionResult::Close,
1836                    StateResult::ConnectBackend | StateResult::Upgrade => unreachable!(),
1837                }
1838            }
1839
1840            counter += 1;
1841        }
1842
1843        if counter >= MAX_LOOP_ITERATIONS {
1844            error!(
1845                "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
1846                log_context!(self), MAX_LOOP_ITERATIONS
1847            );
1848
1849            incr!("http.infinite_loop.error");
1850            self.print_state(self.protocol_string());
1851
1852            return SessionResult::Close;
1853        }
1854
1855        SessionResult::Continue
1856    }
1857
1858    pub fn timeout_status(&self) -> TimeoutStatus {
1859        if self.request_stream.is_main_phase() {
1860            match &self.response_stream {
1861                ResponseStream::BackendAnswer(kawa) if kawa.is_initial() => {
1862                    TimeoutStatus::WaitingForResponse
1863                }
1864                _ => TimeoutStatus::Response,
1865            }
1866        } else if self.keepalive_count > 0 {
1867            TimeoutStatus::WaitingForNewRequest
1868        } else {
1869            TimeoutStatus::Request
1870        }
1871    }
1872}
1873
1874impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState for Http<Front, L> {
1875    fn ready(
1876        &mut self,
1877        session: Rc<RefCell<dyn crate::ProxySession>>,
1878        proxy: Rc<RefCell<dyn L7Proxy>>,
1879        metrics: &mut SessionMetrics,
1880    ) -> SessionResult {
1881        let session_result = self.ready_inner(session, proxy, metrics);
1882        if session_result == SessionResult::Upgrade {
1883            let response_storage = match &mut self.response_stream {
1884                ResponseStream::BackendAnswer(response_stream) => &mut response_stream.storage,
1885                _ => return SessionResult::Close,
1886            };
1887
1888            // sync the underlying Checkout buffers, if they contain remaining data
1889            // it will be processed once upgraded to websocket
1890            self.request_stream.storage.buffer.sync(
1891                self.request_stream.storage.end,
1892                self.request_stream.storage.head,
1893            );
1894            response_storage
1895                .buffer
1896                .sync(response_storage.end, response_storage.head);
1897        }
1898        session_result
1899    }
1900
1901    fn update_readiness(&mut self, token: Token, events: Ready) {
1902        if self.frontend_token == token {
1903            self.frontend_readiness.event |= events;
1904        } else if self.backend_token == Some(token) {
1905            self.backend_readiness.event |= events;
1906        }
1907    }
1908
1909    fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1910        self.close_backend(proxy, metrics);
1911        self.frontend_socket.socket_close();
1912        let _ = self.frontend_socket.socket_write_vectored(&[]);
1913
1914        //if the state was initial, the connection was already reset
1915        if !self.request_stream.is_initial() {
1916            gauge_add!("http.active_requests", -1);
1917
1918            if let Some(b) = self.backend.as_mut() {
1919                let mut backend = b.borrow_mut();
1920                backend.active_requests = backend.active_requests.saturating_sub(1);
1921            }
1922        }
1923    }
1924
1925    fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
1926        //info!("got timeout for token: {:?}", token);
1927        if self.frontend_token == token {
1928            self.container_frontend_timeout.triggered();
1929            return match self.timeout_status() {
1930                // we do not have a complete answer
1931                TimeoutStatus::Request => {
1932                    self.set_answer(DefaultAnswer::Answer408 {
1933                        duration: self.container_frontend_timeout.to_string(),
1934                    });
1935                    self.writable(metrics)
1936                }
1937                // we have a complete answer but the response did not start
1938                TimeoutStatus::WaitingForResponse => {
1939                    // this case is ambiguous, as it is the frontend timeout that triggers while we were waiting for response
1940                    // the timeout responsibility should have switched before
1941                    self.set_answer(DefaultAnswer::Answer504 {
1942                        duration: self.container_backend_timeout.to_string(),
1943                    });
1944                    self.writable(metrics)
1945                }
1946                // we have a complete answer and the start of a response, but the request was not tagged as terminated
1947                // for now we place responsibility of timeout on the backend in those cases, so we ignore this
1948                TimeoutStatus::Response => StateResult::Continue,
1949                // timeout in keep-alive, simply close the connection
1950                TimeoutStatus::WaitingForNewRequest => StateResult::CloseSession,
1951            };
1952        }
1953
1954        if self.backend_token == Some(token) {
1955            //info!("backend timeout triggered for token {:?}", token);
1956            self.container_backend_timeout.triggered();
1957            return match self.timeout_status() {
1958                TimeoutStatus::Request => {
1959                    error!(
1960                        "got backend timeout while waiting for a request, this should not happen"
1961                    );
1962                    self.set_answer(DefaultAnswer::Answer504 {
1963                        duration: self.container_backend_timeout.to_string(),
1964                    });
1965                    self.writable(metrics)
1966                }
1967                TimeoutStatus::WaitingForResponse => {
1968                    self.set_answer(DefaultAnswer::Answer504 {
1969                        duration: self.container_backend_timeout.to_string(),
1970                    });
1971                    self.writable(metrics)
1972                }
1973                TimeoutStatus::Response => {
1974                    error!(
1975                        "backend {:?} timeout while receiving response (cluster {:?})",
1976                        self.context.backend_id, self.context.cluster_id
1977                    );
1978                    StateResult::CloseSession
1979                }
1980                // in keep-alive, we place responsibility of timeout on the frontend, so we ignore this
1981                TimeoutStatus::WaitingForNewRequest => StateResult::Continue,
1982            };
1983        }
1984
1985        error!("{} Got timeout for an invalid token", log_context!(self));
1986        StateResult::CloseSession
1987    }
1988
1989    fn cancel_timeouts(&mut self) {
1990        self.container_backend_timeout.cancel();
1991        self.container_frontend_timeout.cancel();
1992    }
1993
1994    fn print_state(&self, context: &str) {
1995        error!(
1996            "\
1997{} {} Session(Kawa)
1998\tFrontend:
1999\t\ttoken: {:?}\treadiness: {:?}\tstate: {:?}
2000\tBackend:
2001\t\ttoken: {:?}\treadiness: {:?}",
2002            log_context!(self),
2003            context,
2004            self.frontend_token,
2005            self.frontend_readiness,
2006            self.request_stream.parsing_phase,
2007            self.backend_token,
2008            self.backend_readiness,
2009            // self.response_stream.parsing_phase
2010        );
2011    }
2012
2013    fn shutting_down(&mut self) -> SessionIsToBeClosed {
2014        if self.request_stream.is_initial() && self.request_stream.storage.is_empty()
2015        // && self.response_stream.storage.is_empty()
2016        {
2017            true
2018        } else {
2019            self.context.closing = true;
2020            false
2021        }
2022    }
2023}
2024
2025fn handle_connection_result(
2026    connection_result: Result<BackendConnectAction, BackendConnectionError>,
2027) -> Option<SessionResult> {
2028    match connection_result {
2029        // reuse connection or send a default answer, we can continue
2030        Ok(BackendConnectAction::Reuse) => None,
2031        Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
2032            // we must wait for an event
2033            Some(SessionResult::Continue)
2034        }
2035        Err(_) => {
2036            // All BackendConnectionError already set a default answer
2037            // the session must continue to serve it
2038            // - NotFound: not used for http (only tcp)
2039            // - RetrieveClusterError: 301/400/401/404,
2040            // - MaxConnectionRetries: 503,
2041            // - Backend: 503,
2042            // - MaxSessionsMemory: not checked in connect_to_backend (TODO: check it?)
2043            None
2044        }
2045    }
2046}
2047
2048/// Save the HTTP status code of the backend response
2049fn save_http_status_metric(status: Option<u16>, context: LogContext) {
2050    if let Some(status) = status {
2051        match status {
2052            100..=199 => {
2053                incr!("http.status.1xx", context.cluster_id, context.backend_id);
2054            }
2055            200..=299 => {
2056                incr!("http.status.2xx", context.cluster_id, context.backend_id);
2057            }
2058            300..=399 => {
2059                incr!("http.status.3xx", context.cluster_id, context.backend_id);
2060            }
2061            400..=499 => {
2062                incr!("http.status.4xx", context.cluster_id, context.backend_id);
2063            }
2064            500..=599 => {
2065                incr!("http.status.5xx", context.cluster_id, context.backend_id);
2066            }
2067            _ => {
2068                // http responses with other codes (protocol error)
2069                incr!("http.status.other");
2070            }
2071        }
2072    }
2073}