sozu_lib/protocol/kawa_h1/
mod.rs

1pub mod answers;
2pub mod diagnostics;
3pub mod editor;
4pub mod parser;
5
6use std::{
7    cell::RefCell,
8    io::ErrorKind,
9    net::{Shutdown, SocketAddr},
10    rc::{Rc, Weak},
11    time::{Duration, Instant},
12};
13
14use mio::{Interest, Token, net::TcpStream};
15use rusty_ulid::Ulid;
16use sozu_command::{
17    config::MAX_LOOP_ITERATIONS,
18    logging::EndpointRecord,
19    proto::command::{Event, EventKind, ListenerType},
20};
21
22// use time::{Duration, Instant};
23use crate::{
24    AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus,
25    L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession, Readiness,
26    RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult,
27    backends::{Backend, BackendError},
28    pool::{Checkout, Pool},
29    protocol::{
30        SessionState,
31        http::{
32            answers::DefaultAnswerStream,
33            diagnostics::{diagnostic_400_502, diagnostic_413_507},
34            editor::HttpContext,
35            parser::Method,
36        },
37        pipe::WebSocketContext,
38    },
39    retry::RetryPolicy,
40    router::Route,
41    server::{CONN_RETRIES, push_event},
42    socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
43    sozu_command::{logging::LogContext, ready::Ready},
44    timer::TimeoutContainer,
45};
46
47/// This macro is defined uniquely in this module to help the tracking of kawa h1
48/// issues inside Sōzu
49macro_rules! log_context {
50    ($self:expr) => {
51        format!(
52            "KAWA-H1\t{}\tSession(public={}, session={}, frontend={}, readiness={}, backend={}, readiness={})\t >>>",
53            $self.context.log_context(),
54            $self.context.public_address.to_string(),
55            $self.context.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
56            $self.frontend_token.0,
57            $self.frontend_readiness,
58            $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
59            $self.backend_readiness,
60        )
61    };
62}
63
64/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
65type GenericHttpStream = kawa::Kawa<Checkout>;
66
67impl kawa::AsBuffer for Checkout {
68    fn as_buffer(&self) -> &[u8] {
69        self.inner.extra()
70    }
71    fn as_mut_buffer(&mut self) -> &mut [u8] {
72        self.inner.extra_mut()
73    }
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum DefaultAnswer {
78    Answer301 {
79        location: String,
80    },
81    Answer400 {
82        message: String,
83        phase: kawa::ParsingPhaseMarker,
84        successfully_parsed: String,
85        partially_parsed: String,
86        invalid: String,
87    },
88    Answer401 {},
89    Answer404 {},
90    Answer408 {
91        duration: String,
92    },
93    Answer413 {
94        message: String,
95        phase: kawa::ParsingPhaseMarker,
96        capacity: usize,
97    },
98    Answer502 {
99        message: String,
100        phase: kawa::ParsingPhaseMarker,
101        successfully_parsed: String,
102        partially_parsed: String,
103        invalid: String,
104    },
105    Answer503 {
106        message: String,
107    },
108    Answer504 {
109        duration: String,
110    },
111    Answer507 {
112        phase: kawa::ParsingPhaseMarker,
113        message: String,
114        capacity: usize,
115    },
116}
117
118impl From<&DefaultAnswer> for u16 {
119    fn from(answer: &DefaultAnswer) -> u16 {
120        match answer {
121            DefaultAnswer::Answer301 { .. } => 301,
122            DefaultAnswer::Answer400 { .. } => 400,
123            DefaultAnswer::Answer401 { .. } => 401,
124            DefaultAnswer::Answer404 { .. } => 404,
125            DefaultAnswer::Answer408 { .. } => 408,
126            DefaultAnswer::Answer413 { .. } => 413,
127            DefaultAnswer::Answer502 { .. } => 502,
128            DefaultAnswer::Answer503 { .. } => 503,
129            DefaultAnswer::Answer504 { .. } => 504,
130            DefaultAnswer::Answer507 { .. } => 507,
131        }
132    }
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum TimeoutStatus {
137    Request,
138    Response,
139    WaitingForNewRequest,
140    WaitingForResponse,
141}
142
143pub enum ResponseStream {
144    BackendAnswer(GenericHttpStream),
145    DefaultAnswer(u16, DefaultAnswerStream),
146}
147
148/// Http will be contained in State which itself is contained by Session
149pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
150    answers: Rc<RefCell<answers::HttpAnswers>>,
151    pub backend: Option<Rc<RefCell<Backend>>>,
152    backend_connection_status: BackendConnectionStatus,
153    pub backend_readiness: Readiness,
154    pub backend_socket: Option<TcpStream>,
155    backend_stop: Option<Instant>,
156    pub backend_token: Option<Token>,
157    pub container_backend_timeout: TimeoutContainer,
158    pub container_frontend_timeout: TimeoutContainer,
159    configured_backend_timeout: Duration,
160    configured_connect_timeout: Duration,
161    configured_frontend_timeout: Duration,
162    /// attempts to connect to the backends during the session
163    connection_attempts: u8,
164    pub frontend_readiness: Readiness,
165    pub frontend_socket: Front,
166    frontend_token: Token,
167    keepalive_count: usize,
168    listener: Rc<RefCell<L>>,
169    pub request_stream: GenericHttpStream,
170    pub response_stream: ResponseStream,
171    /// The HTTP context was separated from the State for borrowing reasons.
172    /// Calling a kawa parser mutably borrows the State through request_stream or response_stream,
173    /// so Http can't be borrowed again to be used in callbacks. HttContext is an independant
174    /// subsection of Http that can be mutably borrowed for parser callbacks.
175    pub context: HttpContext,
176}
177
178impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
179    /// Instantiate a new HTTP SessionState with:
180    ///
181    /// - frontend_interest: READABLE | HUP | ERROR
182    /// - frontend_event: EMPTY
183    /// - backend_interest: EMPTY
184    /// - backend_event: EMPTY
185    ///
186    /// Remember to set the events from the previous State!
187    #[allow(clippy::too_many_arguments)]
188    pub fn new(
189        answers: Rc<RefCell<answers::HttpAnswers>>,
190        configured_backend_timeout: Duration,
191        configured_connect_timeout: Duration,
192        configured_frontend_timeout: Duration,
193        container_frontend_timeout: TimeoutContainer,
194        frontend_socket: Front,
195        frontend_token: Token,
196        listener: Rc<RefCell<L>>,
197        pool: Weak<RefCell<Pool>>,
198        protocol: Protocol,
199        public_address: SocketAddr,
200        request_id: Ulid,
201        session_address: Option<SocketAddr>,
202        sticky_name: String,
203    ) -> Result<Http<Front, L>, AcceptError> {
204        let (front_buffer, back_buffer) = match pool.upgrade() {
205            Some(pool) => {
206                let mut pool = pool.borrow_mut();
207                match (pool.checkout(), pool.checkout()) {
208                    (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
209                    _ => return Err(AcceptError::BufferCapacityReached),
210                }
211            }
212            None => return Err(AcceptError::BufferCapacityReached),
213        };
214        Ok(Http {
215            answers,
216            backend_connection_status: BackendConnectionStatus::NotConnected,
217            backend_readiness: Readiness::new(),
218            backend_socket: None,
219            backend_stop: None,
220            backend_token: None,
221            backend: None,
222            configured_backend_timeout,
223            configured_connect_timeout,
224            configured_frontend_timeout,
225            connection_attempts: 0,
226            container_backend_timeout: TimeoutContainer::new_empty(configured_connect_timeout),
227            container_frontend_timeout,
228            frontend_readiness: Readiness {
229                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
230                event: Ready::EMPTY,
231            },
232            frontend_socket,
233            frontend_token,
234            keepalive_count: 0,
235            listener,
236            request_stream: GenericHttpStream::new(
237                kawa::Kind::Request,
238                kawa::Buffer::new(front_buffer),
239            ),
240            response_stream: ResponseStream::BackendAnswer(GenericHttpStream::new(
241                kawa::Kind::Response,
242                kawa::Buffer::new(back_buffer),
243            )),
244            context: HttpContext::new(
245                request_id,
246                protocol,
247                public_address,
248                session_address,
249                sticky_name,
250            ),
251        })
252    }
253
254    /// Reset the connection in case of keep-alive to be ready for the next request
255    pub fn reset(&mut self) {
256        trace!("{} ============== reset", log_context!(self));
257        let response_stream = match &mut self.response_stream {
258            ResponseStream::BackendAnswer(response_stream) => response_stream,
259            _ => return,
260        };
261
262        self.context.id = Ulid::generate();
263        self.context.reset();
264
265        self.request_stream.clear();
266        response_stream.clear();
267        self.keepalive_count += 1;
268        gauge_add!("http.active_requests", -1);
269
270        if let Some(backend) = &mut self.backend {
271            let mut backend = backend.borrow_mut();
272            backend.active_requests = backend.active_requests.saturating_sub(1);
273        }
274
275        // reset the front timeout and cancel the back timeout while we are
276        // waiting for a new request
277        self.container_backend_timeout.cancel();
278        self.container_frontend_timeout
279            .set_duration(self.configured_frontend_timeout);
280        self.frontend_readiness.interest = Ready::READABLE | Ready::HUP | Ready::ERROR;
281        self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
282
283        // We are resetting the offset of request and response stream buffers
284        // We do have to keep cursor position on the request, if there is data
285        // in the request stream to preserve http pipelining.
286
287        // Print the left-over response buffer output to track in which case it
288        // may happens
289        let response_storage = &mut response_stream.storage;
290        if !response_storage.is_empty() {
291            warn!(
292                "{} Leftover fragment from response: {}",
293                log_context!(self),
294                parser::view(
295                    response_storage.used(),
296                    16,
297                    &[response_storage.start, response_storage.end,],
298                )
299            );
300        }
301
302        response_storage.clear();
303        if !self.request_stream.storage.is_empty() {
304            self.frontend_readiness.event.insert(Ready::READABLE);
305        } else {
306            self.request_stream.storage.clear();
307        }
308    }
309
310    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
311        trace!("{} ============== readable", log_context!(self));
312        if !self.container_frontend_timeout.reset() {
313            error!(
314                "could not reset front timeout {:?}",
315                self.configured_frontend_timeout
316            );
317            self.print_state(self.protocol_string());
318        }
319
320        let response_stream = match &mut self.response_stream {
321            ResponseStream::BackendAnswer(response_stream) => response_stream,
322            ResponseStream::DefaultAnswer(..) => {
323                error!(
324                    "{} Sending default answer, should not read from frontend socket",
325                    log_context!(self)
326                );
327
328                self.frontend_readiness.interest.remove(Ready::READABLE);
329                self.frontend_readiness.interest.insert(Ready::WRITABLE);
330                return StateResult::Continue;
331            }
332        };
333
334        if self.request_stream.storage.is_full() {
335            self.frontend_readiness.interest.remove(Ready::READABLE);
336            if self.request_stream.is_main_phase() {
337                self.backend_readiness.interest.insert(Ready::WRITABLE);
338            } else {
339                // client has filled its buffer and we can't empty it
340                self.set_answer(DefaultAnswer::Answer413 {
341                    capacity: self.request_stream.storage.capacity(),
342                    phase: self.request_stream.parsing_phase.marker(),
343                    message: diagnostic_413_507(self.request_stream.parsing_phase),
344                });
345            }
346            return StateResult::Continue;
347        }
348
349        let (size, socket_state) = self
350            .frontend_socket
351            .socket_read(self.request_stream.storage.space());
352
353        debug!("{} Read {} bytes", log_context!(self), size);
354
355        if size > 0 {
356            self.request_stream.storage.fill(size);
357            count!("bytes_in", size as i64);
358            metrics.bin += size;
359            // if self.kawa_request.storage.is_full() {
360            //     self.frontend_readiness.interest.remove(Ready::READABLE);
361            // }
362        } else {
363            self.frontend_readiness.event.remove(Ready::READABLE);
364        }
365
366        match socket_state {
367            SocketResult::Error | SocketResult::Closed => {
368                if self.request_stream.is_initial() {
369                    // count an error if we were waiting for the first request
370                    // otherwise, if we already had one completed request and response,
371                    // and are waiting for the next one, we do not count a socket
372                    // closing abruptly as an error
373                    if self.keepalive_count == 0 {
374                        self.frontend_socket.read_error();
375                    }
376                } else {
377                    self.frontend_socket.read_error();
378                    self.log_request_error(
379                        metrics,
380                        &format!(
381                            "front socket {socket_state:?}, closing the session. Readiness: {:?} -> {:?}, read {size} bytes",
382                            self.frontend_readiness,
383                            self.backend_readiness,
384                        )
385                    );
386                }
387                return StateResult::CloseSession;
388            }
389            SocketResult::WouldBlock => {
390                self.frontend_readiness.event.remove(Ready::READABLE);
391            }
392            SocketResult::Continue => {}
393        };
394
395        trace!("{} ============== readable_parse", log_context!(self));
396        let was_initial = self.request_stream.is_initial();
397        let was_not_proxying = !self.request_stream.is_main_phase();
398
399        kawa::h1::parse(&mut self.request_stream, &mut self.context);
400        // kawa::debug_kawa(&self.request_stream);
401
402        if was_initial && !self.request_stream.is_initial() {
403            // if it was the first request, the front timeout duration
404            // was set to request_timeout, which is much lower. For future
405            // requests on this connection, we can wait a bit more
406            self.container_frontend_timeout
407                .set_duration(self.configured_frontend_timeout);
408            gauge_add!("http.active_requests", 1);
409            incr!("http.requests");
410        }
411
412        if let kawa::ParsingPhase::Error { marker, kind } = self.request_stream.parsing_phase {
413            incr!("http.frontend_parse_errors");
414            warn!(
415                "{} Parsing request error in {:?}: {}",
416                log_context!(self),
417                marker,
418                match kind {
419                    kawa::ParsingErrorKind::Consuming { index } => {
420                        let kawa = &self.request_stream;
421                        parser::view(
422                            kawa.storage.used(),
423                            16,
424                            &[
425                                kawa.storage.start,
426                                kawa.storage.head,
427                                index as usize,
428                                kawa.storage.end,
429                            ],
430                        )
431                    }
432                    kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
433                }
434            );
435            if response_stream.consumed {
436                self.log_request_error(metrics, "Parsing error on the request");
437                return StateResult::CloseSession;
438            } else {
439                let (message, successfully_parsed, partially_parsed, invalid) =
440                    diagnostic_400_502(marker, kind, &self.request_stream);
441                self.set_answer(DefaultAnswer::Answer400 {
442                    message,
443                    phase: marker,
444                    successfully_parsed,
445                    partially_parsed,
446                    invalid,
447                });
448                return StateResult::Continue;
449            }
450        }
451
452        if self.request_stream.is_main_phase() {
453            self.backend_readiness.interest.insert(Ready::WRITABLE);
454            if was_not_proxying {
455                // Sozu tries to connect only once all the headers were gathered and edited
456                // this could be improved
457                trace!("{} ============== HANDLE CONNECTION!", log_context!(self));
458                return StateResult::ConnectBackend;
459            }
460        }
461        if self.request_stream.is_terminated() {
462            self.frontend_readiness.interest.remove(Ready::READABLE);
463        }
464
465        StateResult::Continue
466    }
467
468    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
469        trace!("{} ============== writable", log_context!(self));
470        let response_stream = match &mut self.response_stream {
471            ResponseStream::BackendAnswer(response_stream) => response_stream,
472            _ => return self.writable_default_answer(metrics),
473        };
474
475        response_stream.prepare(&mut kawa::h1::BlockConverter);
476
477        let bufs = response_stream.as_io_slice();
478        if bufs.is_empty() && !self.frontend_socket.socket_wants_write() {
479            self.frontend_readiness.interest.remove(Ready::WRITABLE);
480            // do not shortcut, response might have been terminated without anything more to send
481        }
482
483        let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
484
485        debug!("{} Wrote {} bytes", log_context!(self), size);
486
487        if size > 0 {
488            response_stream.consume(size);
489            count!("bytes_out", size as i64);
490            metrics.bout += size;
491            self.backend_readiness.interest.insert(Ready::READABLE);
492        }
493
494        match socket_state {
495            SocketResult::Error | SocketResult::Closed => {
496                self.frontend_socket.write_error();
497                self.log_request_error(
498                    metrics,
499                    &format!(
500                        "front socket {socket_state:?}, closing session.  Readiness: {:?} -> {:?}, read {size} bytes",
501                        self.frontend_readiness,
502                        self.backend_readiness,
503                    ),
504                );
505                return StateResult::CloseSession;
506            }
507            SocketResult::WouldBlock => {
508                self.frontend_readiness.event.remove(Ready::WRITABLE);
509            }
510            SocketResult::Continue => {}
511        }
512
513        if self.frontend_socket.socket_wants_write() {
514            return StateResult::Continue;
515        }
516
517        if response_stream.is_terminated() && response_stream.is_completed() {
518            if self.context.closing {
519                debug!("{} closing proxy, no keep alive", log_context!(self));
520                self.log_request_success(metrics);
521                return StateResult::CloseSession;
522            }
523
524            match response_stream.detached.status_line {
525                kawa::StatusLine::Response { code: 101, .. } => {
526                    trace!("{} ============== HANDLE UPGRADE!", log_context!(self));
527                    self.log_request_success(metrics);
528                    return StateResult::Upgrade;
529                }
530                kawa::StatusLine::Response { code: 100, .. } => {
531                    trace!("{} ============== HANDLE CONTINUE!", log_context!(self));
532                    response_stream.clear();
533                    self.log_request_success(metrics);
534                    return StateResult::Continue;
535                }
536                kawa::StatusLine::Response { code: 103, .. } => {
537                    self.backend_readiness.event.insert(Ready::READABLE);
538                    trace!("{} ============== HANDLE EARLY HINT!", log_context!(self));
539                    response_stream.clear();
540                    self.log_request_success(metrics);
541                    return StateResult::Continue;
542                }
543                _ => (),
544            }
545
546            let response_length_known = response_stream.body_size != kawa::BodySize::Empty;
547            let request_length_known = self.request_stream.body_size != kawa::BodySize::Empty;
548            if !(self.request_stream.is_terminated() && self.request_stream.is_completed())
549                && request_length_known
550            {
551                error!(
552                    "{} Response terminated before request, this case is not handled properly yet",
553                    log_context!(self)
554                );
555                incr!("http.early_response_close");
556                // FIXME: this will cause problems with pipelining
557                // return StateResult::CloseSession;
558            }
559
560            // FIXME: we could get smarter about this
561            // with no keepalive on backend, we could open a new backend ConnectionError
562            // with no keepalive on front but keepalive on backend, we could have
563            // a pool of connections
564            trace!(
565                "{} ============== HANDLE KEEP-ALIVE: {} {} {}",
566                log_context!(self),
567                self.context.keep_alive_frontend,
568                self.context.keep_alive_backend,
569                response_length_known
570            );
571
572            self.log_request_success(metrics);
573            return match (
574                self.context.keep_alive_frontend,
575                self.context.keep_alive_backend,
576                response_length_known,
577            ) {
578                (true, true, true) => {
579                    debug!("{} Keep alive frontend/backend", log_context!(self));
580                    metrics.reset();
581                    self.reset();
582                    StateResult::Continue
583                }
584                (true, false, true) => {
585                    debug!("{} Keep alive frontend", log_context!(self));
586                    metrics.reset();
587                    self.reset();
588                    StateResult::CloseBackend
589                }
590                _ => {
591                    debug!("{} No keep alive", log_context!(self));
592                    StateResult::CloseSession
593                }
594            };
595        }
596        StateResult::Continue
597    }
598
599    fn writable_default_answer(&mut self, metrics: &mut SessionMetrics) -> StateResult {
600        trace!(
601            "{} ============== writable_default_answer",
602            log_context!(self)
603        );
604        let response_stream = match &mut self.response_stream {
605            ResponseStream::DefaultAnswer(_, response_stream) => response_stream,
606            _ => return StateResult::CloseSession,
607        };
608        let bufs = response_stream.as_io_slice();
609        let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
610
611        count!("bytes_out", size as i64);
612        metrics.bout += size;
613        response_stream.consume(size);
614
615        if size == 0 || socket_state != SocketResult::Continue {
616            self.frontend_readiness.event.remove(Ready::WRITABLE);
617        }
618
619        if response_stream.is_completed() {
620            save_http_status_metric(self.context.status, self.context.log_context());
621            self.log_default_answer_success(metrics);
622            self.frontend_readiness.reset();
623            self.backend_readiness.reset();
624            return StateResult::CloseSession;
625        }
626
627        if socket_state == SocketResult::Error {
628            self.frontend_socket.write_error();
629            self.log_request_error(
630                metrics,
631                "error writing default answer to front socket, closing",
632            );
633            StateResult::CloseSession
634        } else {
635            StateResult::Continue
636        }
637    }
638
639    pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
640        trace!("{} ============== backend_writable", log_context!(self));
641        if let ResponseStream::DefaultAnswer(..) = self.response_stream {
642            error!(
643                "{}\tsending default answer, should not write to back",
644                log_context!(self)
645            );
646            self.backend_readiness.interest.remove(Ready::WRITABLE);
647            self.frontend_readiness.interest.insert(Ready::WRITABLE);
648            return SessionResult::Continue;
649        }
650
651        let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
652            backend_socket
653        } else {
654            self.log_request_error(metrics, "back socket not found, closing session");
655            return SessionResult::Close;
656        };
657
658        self.request_stream.prepare(&mut kawa::h1::BlockConverter);
659
660        let bufs = self.request_stream.as_io_slice();
661        if bufs.is_empty() {
662            self.backend_readiness.interest.remove(Ready::WRITABLE);
663            return SessionResult::Continue;
664        }
665
666        let (size, socket_state) = backend_socket.socket_write_vectored(&bufs);
667        debug!("{} Wrote {} bytes", log_context!(self), size);
668
669        if size > 0 {
670            self.request_stream.consume(size);
671            count!("back_bytes_out", size as i64);
672            metrics.backend_bout += size;
673            self.frontend_readiness.interest.insert(Ready::READABLE);
674            self.backend_readiness.interest.insert(Ready::READABLE);
675        } else {
676            self.backend_readiness.event.remove(Ready::WRITABLE);
677        }
678
679        match socket_state {
680            // the back socket is not writable anymore, so we can drop
681            // the front buffer, no more data can be transmitted.
682            // But the socket might still be readable, or if it is
683            // closed, we might still have some data in the buffer.
684            // As an example, we can get an early response for a large
685            // POST request to refuse it and prevent all of the data
686            // from being transmitted, with the backend server closing
687            // the socket right after sending the response
688            // FIXME: shouldn't we test for response_state then?
689            SocketResult::Error | SocketResult::Closed => {
690                self.frontend_readiness.interest.remove(Ready::READABLE);
691                self.backend_readiness.interest.remove(Ready::WRITABLE);
692                return SessionResult::Continue;
693            }
694            SocketResult::WouldBlock => {
695                self.backend_readiness.event.remove(Ready::WRITABLE);
696            }
697            SocketResult::Continue => {}
698        }
699
700        if self.request_stream.is_terminated() && self.request_stream.is_completed() {
701            self.backend_readiness.interest.remove(Ready::WRITABLE);
702
703            // cancel the front timeout while we are waiting for the server to answer
704            self.container_frontend_timeout.cancel();
705            self.container_backend_timeout.reset();
706        }
707        SessionResult::Continue
708    }
709
710    // Read content from cluster
711    pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
712        trace!("{} ============== backend_readable", log_context!(self));
713        if !self.container_backend_timeout.reset() {
714            error!(
715                "{} Could not reset back timeout {:?}",
716                log_context!(self),
717                self.configured_backend_timeout
718            );
719            self.print_state(self.protocol_string());
720        }
721
722        let response_stream = match &mut self.response_stream {
723            ResponseStream::BackendAnswer(response_stream) => response_stream,
724            _ => {
725                error!(
726                    "{} Sending default answer, should not read from backend socket",
727                    log_context!(self),
728                );
729
730                self.backend_readiness.interest.remove(Ready::READABLE);
731                self.frontend_readiness.interest.insert(Ready::WRITABLE);
732                return SessionResult::Continue;
733            }
734        };
735
736        let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
737            backend_socket
738        } else {
739            self.log_request_error(metrics, "back socket not found, closing session");
740            return SessionResult::Close;
741        };
742
743        if response_stream.storage.is_full() {
744            self.backend_readiness.interest.remove(Ready::READABLE);
745            if response_stream.is_main_phase() {
746                self.frontend_readiness.interest.insert(Ready::WRITABLE);
747            } else {
748                // server has filled its buffer and we can't empty it
749                let capacity = response_stream.storage.capacity();
750                let phase = response_stream.parsing_phase.marker();
751                let message = diagnostic_413_507(response_stream.parsing_phase);
752                self.set_answer(DefaultAnswer::Answer507 {
753                    capacity,
754                    phase,
755                    message,
756                });
757            }
758            return SessionResult::Continue;
759        }
760
761        let (size, socket_state) = backend_socket.socket_read(response_stream.storage.space());
762        debug!("{} Read {} bytes", log_context!(self), size);
763
764        if size > 0 {
765            response_stream.storage.fill(size);
766            count!("back_bytes_in", size as i64);
767            metrics.backend_bin += size;
768            // if self.kawa_response.storage.is_full() {
769            //     self.backend_readiness.interest.remove(Ready::READABLE);
770            // }
771
772            // In case the response starts while the request is not tagged as "terminated",
773            // we place the timeout responsibility on the backend.
774            // This can happen when:
775            // - the request is malformed and doesn't have length information
776            // - kawa fails to detect a properly terminated request (e.g. a GET request with no body and no length)
777            // - the response can start before the end of the request (e.g. stream processing like compression)
778            self.container_frontend_timeout.cancel();
779        } else {
780            self.backend_readiness.event.remove(Ready::READABLE);
781        }
782
783        // TODO: close delimited and backend_hup should be handled better
784        match socket_state {
785            SocketResult::Error => {
786                backend_socket.read_error();
787                self.log_request_error(
788                    metrics,
789                    &format!(
790                        "back socket {socket_state:?}, closing session.  Readiness: {:?} -> {:?}, read {size} bytes",
791                        self.frontend_readiness,
792                        self.backend_readiness,
793                    ),
794                );
795                return SessionResult::Close;
796            }
797            SocketResult::WouldBlock | SocketResult::Closed => {
798                self.backend_readiness.event.remove(Ready::READABLE);
799            }
800            SocketResult::Continue => {}
801        }
802
803        trace!(
804            "{} ============== backend_readable_parse",
805            log_context!(self)
806        );
807        kawa::h1::parse(response_stream, &mut self.context);
808        // kawa::debug_kawa(&self.response_stream);
809
810        if let kawa::ParsingPhase::Error { marker, kind } = response_stream.parsing_phase {
811            incr!("http.backend_parse_errors");
812            warn!(
813                "{} Parsing response error in {:?}: {}",
814                log_context!(self),
815                marker,
816                match kind {
817                    kawa::ParsingErrorKind::Consuming { index } => {
818                        parser::view(
819                            response_stream.storage.used(),
820                            16,
821                            &[
822                                response_stream.storage.start,
823                                response_stream.storage.head,
824                                index as usize,
825                                response_stream.storage.end,
826                            ],
827                        )
828                    }
829                    kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
830                }
831            );
832            if response_stream.consumed {
833                return SessionResult::Close;
834            } else {
835                let (message, successfully_parsed, partially_parsed, invalid) =
836                    diagnostic_400_502(marker, kind, response_stream);
837                self.set_answer(DefaultAnswer::Answer502 {
838                    message,
839                    phase: marker,
840                    successfully_parsed,
841                    partially_parsed,
842                    invalid,
843                });
844                return SessionResult::Continue;
845            }
846        }
847
848        if response_stream.is_main_phase() {
849            self.frontend_readiness.interest.insert(Ready::WRITABLE);
850        }
851        if response_stream.is_terminated() {
852            metrics.backend_stop();
853            self.backend_stop = Some(Instant::now());
854            self.backend_readiness.interest.remove(Ready::READABLE);
855        }
856        SessionResult::Continue
857    }
858}
859
860impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
861    fn log_endpoint(&self) -> EndpointRecord<'_> {
862        EndpointRecord::Http {
863            method: self.context.method.as_deref(),
864            authority: self.context.authority.as_deref(),
865            path: self.context.path.as_deref(),
866            reason: self.context.reason.as_deref(),
867            status: self.context.status,
868        }
869    }
870
871    pub fn get_session_address(&self) -> Option<SocketAddr> {
872        self.context
873            .session_address
874            .or_else(|| self.frontend_socket.socket_ref().peer_addr().ok())
875    }
876
877    pub fn get_backend_address(&self) -> Option<SocketAddr> {
878        self.backend
879            .as_ref()
880            .map(|backend| backend.borrow().address)
881            .or_else(|| {
882                self.backend_socket
883                    .as_ref()
884                    .and_then(|backend| backend.peer_addr().ok())
885            })
886    }
887
888    // The protocol name used in the access logs
889    fn protocol_string(&self) -> &'static str {
890        match self.context.protocol {
891            Protocol::HTTP => "HTTP",
892            Protocol::HTTPS => match self.frontend_socket.protocol() {
893                TransportProtocol::Ssl2 => "HTTPS-SSL2",
894                TransportProtocol::Ssl3 => "HTTPS-SSL3",
895                TransportProtocol::Tls1_0 => "HTTPS-TLS1.0",
896                TransportProtocol::Tls1_1 => "HTTPS-TLS1.1",
897                TransportProtocol::Tls1_2 => "HTTPS-TLS1.2",
898                TransportProtocol::Tls1_3 => "HTTPS-TLS1.3",
899                _ => unreachable!(),
900            },
901            _ => unreachable!(),
902        }
903    }
904
905    /// Format the context of the websocket into a loggable String
906    pub fn websocket_context(&self) -> WebSocketContext {
907        WebSocketContext::Http {
908            method: self.context.method.clone(),
909            authority: self.context.authority.clone(),
910            path: self.context.path.clone(),
911            reason: self.context.reason.clone(),
912            status: self.context.status,
913        }
914    }
915
916    pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
917        let listener = self.listener.borrow();
918        let tags = self.context.authority.as_ref().and_then(|host| {
919            let hostname = match host.split_once(':') {
920                None => host,
921                Some((hostname, _)) => hostname,
922            };
923            listener.get_tags(hostname)
924        });
925
926        let context = self.context.log_context();
927        metrics.register_end_of_session(&context);
928
929        log_access! {
930            error,
931            on_failure: { incr!("unsent-access-logs") },
932            message,
933            context,
934            session_address: self.get_session_address(),
935            backend_address: self.get_backend_address(),
936            protocol: self.protocol_string(),
937            endpoint: self.log_endpoint(),
938            tags,
939            client_rtt: socket_rtt(self.front_socket()),
940            server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
941            service_time: metrics.service_time(),
942            response_time: metrics.backend_response_time(),
943            request_time: metrics.request_time(),
944            bytes_in: metrics.bin,
945            bytes_out: metrics.bout,
946            user_agent: self.context.user_agent.as_deref(),
947            #[cfg(feature = "opentelemetry")]
948            otel: self.context.otel.as_ref(),
949            #[cfg(not(feature = "opentelemetry"))]
950            otel: None,
951        };
952    }
953
954    pub fn log_request_success(&self, metrics: &SessionMetrics) {
955        save_http_status_metric(self.context.status, self.context.log_context());
956        self.log_request(metrics, false, None);
957    }
958
959    pub fn log_default_answer_success(&self, metrics: &SessionMetrics) {
960        self.log_request(metrics, false, None);
961    }
962    pub fn log_request_error(&self, metrics: &mut SessionMetrics, message: &str) {
963        incr!("http.errors");
964        error!(
965            "{} Could not process request properly got: {}",
966            log_context!(self),
967            message
968        );
969        self.print_state(self.protocol_string());
970        self.log_request(metrics, true, Some(message));
971    }
972
973    pub fn set_answer(&mut self, answer: DefaultAnswer) {
974        let status = u16::from(&answer);
975        if let ResponseStream::DefaultAnswer(old_status, ..) = self.response_stream {
976            error!(
977                "already set the default answer to {}, trying to set to {}",
978                old_status, status
979            );
980        } else {
981            match answer {
982                DefaultAnswer::Answer301 { .. } => incr!(
983                    "http.301.redirection",
984                    self.context.cluster_id.as_deref(),
985                    self.context.backend_id.as_deref()
986                ),
987                DefaultAnswer::Answer400 { .. } => incr!("http.400.errors"),
988                DefaultAnswer::Answer401 { .. } => incr!(
989                    "http.401.errors",
990                    self.context.cluster_id.as_deref(),
991                    self.context.backend_id.as_deref()
992                ),
993                DefaultAnswer::Answer404 { .. } => incr!("http.404.errors"),
994                DefaultAnswer::Answer408 { .. } => incr!(
995                    "http.408.errors",
996                    self.context.cluster_id.as_deref(),
997                    self.context.backend_id.as_deref()
998                ),
999                DefaultAnswer::Answer413 { .. } => incr!(
1000                    "http.413.errors",
1001                    self.context.cluster_id.as_deref(),
1002                    self.context.backend_id.as_deref()
1003                ),
1004                DefaultAnswer::Answer502 { .. } => incr!(
1005                    "http.502.errors",
1006                    self.context.cluster_id.as_deref(),
1007                    self.context.backend_id.as_deref()
1008                ),
1009                DefaultAnswer::Answer503 { .. } => incr!(
1010                    "http.503.errors",
1011                    self.context.cluster_id.as_deref(),
1012                    self.context.backend_id.as_deref()
1013                ),
1014                DefaultAnswer::Answer504 { .. } => incr!(
1015                    "http.504.errors",
1016                    self.context.cluster_id.as_deref(),
1017                    self.context.backend_id.as_deref()
1018                ),
1019                DefaultAnswer::Answer507 { .. } => incr!(
1020                    "http.507.errors",
1021                    self.context.cluster_id.as_deref(),
1022                    self.context.backend_id.as_deref()
1023                ),
1024            };
1025        }
1026
1027        let mut kawa = self.answers.borrow().get(
1028            answer,
1029            self.context.id.to_string(),
1030            self.context.cluster_id.as_deref(),
1031            self.context.backend_id.as_deref(),
1032            self.get_route(),
1033        );
1034        kawa.prepare(&mut kawa::h1::BlockConverter);
1035        self.context.status = Some(status);
1036        self.context.reason = None;
1037        self.context.keep_alive_frontend = false;
1038        self.response_stream = ResponseStream::DefaultAnswer(status, kawa);
1039        self.frontend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1040        self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
1041    }
1042
1043    pub fn test_backend_socket(&self) -> bool {
1044        match self.backend_socket {
1045            Some(ref s) => {
1046                let mut tmp = [0u8; 1];
1047                let res = s.peek(&mut tmp[..]);
1048
1049                match res {
1050                    // if the socket is half open, it will report 0 bytes read (EOF)
1051                    Ok(0) => false,
1052                    Ok(_) => true,
1053                    Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
1054                }
1055            }
1056            None => false,
1057        }
1058    }
1059
1060    pub fn is_valid_backend_socket(&self) -> bool {
1061        // if socket was last used in the last second, test it
1062        match self.backend_stop.as_ref() {
1063            Some(stop_instant) => {
1064                let now = Instant::now();
1065                let dur = now - *stop_instant;
1066                if dur > Duration::from_secs(1) {
1067                    return self.test_backend_socket();
1068                }
1069            }
1070            None => return self.test_backend_socket(),
1071        }
1072
1073        true
1074    }
1075
1076    pub fn set_backend_socket(&mut self, socket: TcpStream, backend: Option<Rc<RefCell<Backend>>>) {
1077        self.backend_socket = Some(socket);
1078        self.backend = backend;
1079    }
1080
1081    pub fn set_cluster_id(&mut self, cluster_id: String) {
1082        self.context.cluster_id = Some(cluster_id);
1083    }
1084
1085    pub fn set_backend_id(&mut self, backend_id: String) {
1086        self.context.backend_id = Some(backend_id);
1087    }
1088
1089    pub fn set_backend_token(&mut self, token: Token) {
1090        self.backend_token = Some(token);
1091    }
1092
1093    pub fn clear_backend_token(&mut self) {
1094        self.backend_token = None;
1095    }
1096
1097    pub fn set_backend_timeout(&mut self, dur: Duration) {
1098        if let Some(token) = self.backend_token.as_ref() {
1099            self.container_backend_timeout.set_duration(dur);
1100            self.container_backend_timeout.set(*token);
1101        }
1102    }
1103
1104    pub fn front_socket(&self) -> &TcpStream {
1105        self.frontend_socket.socket_ref()
1106    }
1107
1108    /// WARNING: this function removes the backend entry in the session manager
1109    /// IF the backend_token is set, so that entry can be reused for new backend.
1110    /// I don't think this is a good idea, but it is a quick fix
1111    fn close_backend(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1112        self.container_backend_timeout.cancel();
1113        debug!(
1114            "{}\tPROXY [{}->{}] CLOSED BACKEND",
1115            log_context!(self),
1116            self.frontend_token.0,
1117            self.backend_token
1118                .map(|t| format!("{}", t.0))
1119                .unwrap_or_else(|| "-".to_string())
1120        );
1121
1122        let proxy = proxy.borrow();
1123        if let Some(socket) = &mut self.backend_socket.take() {
1124            if let Err(e) = proxy.deregister_socket(socket) {
1125                error!(
1126                    "{} Error deregistering back socket({:?}): {:?}",
1127                    log_context!(self),
1128                    socket,
1129                    e
1130                );
1131            }
1132            if let Err(e) = socket.shutdown(Shutdown::Both) {
1133                if e.kind() != ErrorKind::NotConnected {
1134                    error!(
1135                        "{} Error shutting down back socket({:?}): {:?}",
1136                        log_context!(self),
1137                        socket,
1138                        e
1139                    );
1140                }
1141            }
1142        }
1143
1144        if let Some(token) = self.backend_token.take() {
1145            proxy.remove_session(token);
1146
1147            if self.backend_connection_status != BackendConnectionStatus::NotConnected {
1148                self.backend_readiness.event = Ready::EMPTY;
1149            }
1150
1151            if self.backend_connection_status == BackendConnectionStatus::Connected {
1152                gauge_add!("backend.connections", -1);
1153                gauge_add!(
1154                    "connections_per_backend",
1155                    -1,
1156                    self.context.cluster_id.as_deref(),
1157                    metrics.backend_id.as_deref()
1158                );
1159            }
1160
1161            self.set_backend_connected(BackendConnectionStatus::NotConnected, metrics);
1162
1163            if let Some(backend) = self.backend.take() {
1164                backend.borrow_mut().dec_connections();
1165            }
1166        }
1167    }
1168
1169    /// Check the number of connection attempts against authorized connection retries
1170    fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> {
1171        if self.connection_attempts >= CONN_RETRIES {
1172            error!(
1173                "{} Max connection attempt reached ({})",
1174                log_context!(self),
1175                self.connection_attempts,
1176            );
1177
1178            self.set_answer(DefaultAnswer::Answer503 {
1179                message: format!(
1180                    "Max connection attempt reached: {}",
1181                    self.connection_attempts
1182                ),
1183            });
1184            return Err(BackendConnectionError::MaxConnectionRetries(None));
1185        }
1186        Ok(())
1187    }
1188
1189    fn check_backend_connection(&mut self, metrics: &mut SessionMetrics) -> bool {
1190        let is_valid_backend_socket = self.is_valid_backend_socket();
1191
1192        if !is_valid_backend_socket {
1193            return false;
1194        }
1195
1196        //matched on keepalive
1197        metrics.backend_id = self.backend.as_ref().map(|i| i.borrow().backend_id.clone());
1198
1199        metrics.backend_start();
1200        metrics.backend_connected();
1201        if let Some(b) = self.backend.as_mut() {
1202            b.borrow_mut().active_requests += 1;
1203        }
1204        true
1205    }
1206
1207    // -> host, path, method
1208    pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1209        let given_method = self
1210            .context
1211            .method
1212            .as_ref()
1213            .ok_or(RetrieveClusterError::NoMethod)?;
1214        let given_authority = self
1215            .context
1216            .authority
1217            .as_deref()
1218            .ok_or(RetrieveClusterError::NoHost)?;
1219        let given_path = self
1220            .context
1221            .path
1222            .as_deref()
1223            .ok_or(RetrieveClusterError::NoPath)?;
1224
1225        Ok((given_authority, given_path, given_method))
1226    }
1227
1228    pub fn get_route(&self) -> String {
1229        if let Some(method) = &self.context.method {
1230            if let Some(authority) = &self.context.authority {
1231                if let Some(path) = &self.context.path {
1232                    return format!("{method} {authority}{path}");
1233                }
1234                return format!("{method} {authority}");
1235            }
1236            return format!("{method}");
1237        }
1238        String::new()
1239    }
1240
1241    fn cluster_id_from_request(
1242        &mut self,
1243        proxy: Rc<RefCell<dyn L7Proxy>>,
1244    ) -> Result<String, RetrieveClusterError> {
1245        let (host, uri, method) = match self.extract_route() {
1246            Ok(tuple) => tuple,
1247            Err(cluster_error) => {
1248                self.set_answer(DefaultAnswer::Answer400 {
1249                    message: "Could not extract the route after connection started, this should not happen.".into(),
1250                    phase: self.request_stream.parsing_phase.marker(),
1251                    successfully_parsed: "null".into(),
1252                    partially_parsed: "null".into(),
1253                    invalid: "null".into(),
1254                });
1255                return Err(cluster_error);
1256            }
1257        };
1258
1259        let route_result = self
1260            .listener
1261            .borrow()
1262            .frontend_from_request(host, uri, method);
1263
1264        let route = match route_result {
1265            Ok(route) => route,
1266            Err(frontend_error) => {
1267                self.set_answer(DefaultAnswer::Answer404 {});
1268                return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
1269            }
1270        };
1271
1272        let cluster_id = match route {
1273            Route::ClusterId(cluster_id) => cluster_id,
1274            Route::Deny => {
1275                self.set_answer(DefaultAnswer::Answer401 {});
1276                return Err(RetrieveClusterError::UnauthorizedRoute);
1277            }
1278        };
1279
1280        let frontend_should_redirect_https = matches!(proxy.borrow().kind(), ListenerType::Http)
1281            && proxy
1282                .borrow()
1283                .clusters()
1284                .get(&cluster_id)
1285                .map(|cluster| cluster.https_redirect)
1286                .unwrap_or(false);
1287
1288        if frontend_should_redirect_https {
1289            self.set_answer(DefaultAnswer::Answer301 {
1290                location: format!("https://{host}{uri}"),
1291            });
1292            return Err(RetrieveClusterError::UnauthorizedRoute);
1293        }
1294
1295        Ok(cluster_id)
1296    }
1297
1298    pub fn backend_from_request(
1299        &mut self,
1300        cluster_id: &str,
1301        frontend_should_stick: bool,
1302        proxy: Rc<RefCell<dyn L7Proxy>>,
1303        metrics: &mut SessionMetrics,
1304    ) -> Result<TcpStream, BackendConnectionError> {
1305        let (backend, conn) = self
1306            .get_backend_for_sticky_session(
1307                frontend_should_stick,
1308                self.context.sticky_session_found.as_deref(),
1309                cluster_id,
1310                proxy,
1311            )
1312            .map_err(|backend_error| {
1313                // some backend errors are actually retryable
1314                // TODO: maybe retry or return a different default answer
1315                self.set_answer(DefaultAnswer::Answer503 {
1316                    message: backend_error.to_string(),
1317                });
1318                BackendConnectionError::Backend(backend_error)
1319            })?;
1320
1321        if frontend_should_stick {
1322            // update sticky name in case it changed I guess?
1323            self.context.sticky_name = self.listener.borrow().get_sticky_name().to_string();
1324
1325            self.context.sticky_session = Some(
1326                backend
1327                    .borrow()
1328                    .sticky_id
1329                    .clone()
1330                    .unwrap_or_else(|| backend.borrow().backend_id.clone()),
1331            );
1332        }
1333
1334        metrics.backend_id = Some(backend.borrow().backend_id.clone());
1335        metrics.backend_start();
1336        self.set_backend_id(backend.borrow().backend_id.clone());
1337
1338        self.backend = Some(backend);
1339        Ok(conn)
1340    }
1341
1342    fn get_backend_for_sticky_session(
1343        &self,
1344        frontend_should_stick: bool,
1345        sticky_session: Option<&str>,
1346        cluster_id: &str,
1347        proxy: Rc<RefCell<dyn L7Proxy>>,
1348    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
1349        match (frontend_should_stick, sticky_session) {
1350            (true, Some(sticky_session)) => proxy
1351                .borrow()
1352                .backends()
1353                .borrow_mut()
1354                .backend_from_sticky_session(cluster_id, sticky_session),
1355            _ => proxy
1356                .borrow()
1357                .backends()
1358                .borrow_mut()
1359                .backend_from_cluster_id(cluster_id),
1360        }
1361    }
1362
1363    fn connect_to_backend(
1364        &mut self,
1365        session_rc: Rc<RefCell<dyn ProxySession>>,
1366        proxy: Rc<RefCell<dyn L7Proxy>>,
1367        metrics: &mut SessionMetrics,
1368    ) -> Result<BackendConnectAction, BackendConnectionError> {
1369        let old_cluster_id = self.context.cluster_id.clone();
1370        let old_backend_token = self.backend_token;
1371
1372        self.check_circuit_breaker()?;
1373
1374        let cluster_id = self
1375            .cluster_id_from_request(proxy.clone())
1376            .map_err(BackendConnectionError::RetrieveClusterError)?;
1377
1378        trace!(
1379            "{} Connect_to_backend: {:?} {:?} {:?}",
1380            log_context!(self),
1381            self.context.cluster_id,
1382            cluster_id,
1383            self.backend_connection_status
1384        );
1385        // check if we can reuse the backend connection
1386        if (self.context.cluster_id.as_ref()) == Some(&cluster_id)
1387            && self.backend_connection_status == BackendConnectionStatus::Connected
1388        {
1389            let has_backend = self
1390                .backend
1391                .as_ref()
1392                .map(|backend| {
1393                    let backend = backend.borrow();
1394                    proxy
1395                        .borrow()
1396                        .backends()
1397                        .borrow()
1398                        .has_backend(&cluster_id, &backend)
1399                })
1400                .unwrap_or(false);
1401
1402            if has_backend && self.check_backend_connection(metrics) {
1403                return Ok(BackendConnectAction::Reuse);
1404            } else if self.backend_token.take().is_some() {
1405                self.close_backend(proxy.clone(), metrics);
1406            }
1407        }
1408
1409        //replacing with a connection to another cluster
1410        if old_cluster_id.is_some()
1411            && old_cluster_id.as_ref() != Some(&cluster_id)
1412            && self.backend_token.take().is_some()
1413        {
1414            self.close_backend(proxy.clone(), metrics);
1415        }
1416
1417        self.context.cluster_id = Some(cluster_id.clone());
1418
1419        let frontend_should_stick = proxy
1420            .borrow()
1421            .clusters()
1422            .get(&cluster_id)
1423            .map(|cluster| cluster.sticky_session)
1424            .unwrap_or(false);
1425
1426        let mut socket =
1427            self.backend_from_request(&cluster_id, frontend_should_stick, proxy.clone(), metrics)?;
1428        if let Err(e) = socket.set_nodelay(true) {
1429            error!(
1430                "{} Error setting nodelay on backend socket({:?}): {:?}",
1431                log_context!(self),
1432                socket,
1433                e
1434            );
1435        }
1436
1437        self.backend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1438        self.backend_connection_status = BackendConnectionStatus::Connecting(Instant::now());
1439
1440        match old_backend_token {
1441            Some(backend_token) => {
1442                self.set_backend_token(backend_token);
1443                if let Err(e) = proxy.borrow().register_socket(
1444                    &mut socket,
1445                    backend_token,
1446                    Interest::READABLE | Interest::WRITABLE,
1447                ) {
1448                    error!(
1449                        "{} Error registering back socket({:?}): {:?}",
1450                        log_context!(self),
1451                        socket,
1452                        e
1453                    );
1454                }
1455
1456                self.set_backend_socket(socket, self.backend.clone());
1457                self.set_backend_timeout(self.configured_connect_timeout);
1458
1459                Ok(BackendConnectAction::Replace)
1460            }
1461            None => {
1462                let backend_token = proxy.borrow().add_session(session_rc);
1463
1464                if let Err(e) = proxy.borrow().register_socket(
1465                    &mut socket,
1466                    backend_token,
1467                    Interest::READABLE | Interest::WRITABLE,
1468                ) {
1469                    error!(
1470                        "{} Error registering back socket({:?}): {:?}",
1471                        log_context!(self),
1472                        socket,
1473                        e
1474                    );
1475                }
1476
1477                self.set_backend_socket(socket, self.backend.clone());
1478                self.set_backend_token(backend_token);
1479                self.set_backend_timeout(self.configured_connect_timeout);
1480
1481                Ok(BackendConnectAction::New)
1482            }
1483        }
1484    }
1485
1486    fn set_backend_connected(
1487        &mut self,
1488        connected: BackendConnectionStatus,
1489        metrics: &mut SessionMetrics,
1490    ) {
1491        let last = self.backend_connection_status;
1492        self.backend_connection_status = connected;
1493
1494        if connected == BackendConnectionStatus::Connected {
1495            gauge_add!("backend.connections", 1);
1496            gauge_add!(
1497                "connections_per_backend",
1498                1,
1499                self.context.cluster_id.as_deref(),
1500                metrics.backend_id.as_deref()
1501            );
1502
1503            // the back timeout was of connect_timeout duration before,
1504            // now that we're connected, move to backend_timeout duration
1505            self.set_backend_timeout(self.configured_backend_timeout);
1506            // if we are not waiting for the backend response, its timeout is concelled
1507            // it should be set when the request has been entirely transmitted
1508            if !self.backend_readiness.interest.is_readable() {
1509                self.container_backend_timeout.cancel();
1510            }
1511
1512            if let Some(backend) = &self.backend {
1513                let mut backend = backend.borrow_mut();
1514
1515                if backend.retry_policy.is_down() {
1516                    incr!(
1517                        "backend.up",
1518                        self.context.cluster_id.as_deref(),
1519                        metrics.backend_id.as_deref()
1520                    );
1521
1522                    info!(
1523                        "backend server {} at {} is up",
1524                        backend.backend_id, backend.address
1525                    );
1526
1527                    push_event(Event {
1528                        kind: EventKind::BackendUp as i32,
1529                        backend_id: Some(backend.backend_id.to_owned()),
1530                        address: Some(backend.address.into()),
1531                        cluster_id: None,
1532                    });
1533                }
1534
1535                if let BackendConnectionStatus::Connecting(start) = last {
1536                    backend.set_connection_time(Instant::now() - start);
1537                }
1538
1539                //successful connection, reset failure counter
1540                backend.failures = 0;
1541                backend.active_requests += 1;
1542                backend.retry_policy.succeed();
1543            }
1544        }
1545    }
1546
1547    fn fail_backend_connection(&mut self, metrics: &SessionMetrics) {
1548        if let Some(backend) = &self.backend {
1549            let mut backend = backend.borrow_mut();
1550            backend.failures += 1;
1551
1552            let already_unavailable = backend.retry_policy.is_down();
1553            backend.retry_policy.fail();
1554            incr!(
1555                "backend.connections.error",
1556                self.context.cluster_id.as_deref(),
1557                metrics.backend_id.as_deref()
1558            );
1559
1560            if !already_unavailable && backend.retry_policy.is_down() {
1561                error!(
1562                    "{} backend server {} at {} is down",
1563                    log_context!(self),
1564                    backend.backend_id,
1565                    backend.address
1566                );
1567
1568                incr!(
1569                    "backend.down",
1570                    self.context.cluster_id.as_deref(),
1571                    metrics.backend_id.as_deref()
1572                );
1573
1574                push_event(Event {
1575                    kind: EventKind::BackendDown as i32,
1576                    backend_id: Some(backend.backend_id.to_owned()),
1577                    address: Some(backend.address.into()),
1578                    cluster_id: None,
1579                });
1580            }
1581        }
1582    }
1583
1584    pub fn backend_hup(&mut self, _metrics: &mut SessionMetrics) -> StateResult {
1585        let response_stream = match &mut self.response_stream {
1586            ResponseStream::BackendAnswer(response_stream) => response_stream,
1587            _ => return StateResult::CloseBackend,
1588        };
1589
1590        // there might still data we can read on the socket
1591        if self.backend_readiness.event.is_readable()
1592            && self.backend_readiness.interest.is_readable()
1593        {
1594            return StateResult::Continue;
1595        }
1596
1597        // the backend finished to answer we can close
1598        if response_stream.is_terminated() {
1599            return StateResult::CloseBackend;
1600        }
1601        match (
1602            self.request_stream.is_initial(),
1603            response_stream.is_initial(),
1604        ) {
1605            // backend stopped before response is finished,
1606            // or maybe it was malformed in the first place (no Content-Length)
1607            (_, false) => {
1608                error!(
1609                    "{} Backend closed before session is over",
1610                    log_context!(self),
1611                );
1612
1613                trace!(
1614                    "{} Backend hang-up, setting the parsing phase of the response stream to terminated, this also takes care of responses that lack length information.",
1615                    log_context!(self)
1616                );
1617
1618                response_stream.parsing_phase = kawa::ParsingPhase::Terminated;
1619
1620                // writable() will be called again and finish the session properly
1621                // for this reason, writable must not short cut
1622                self.frontend_readiness.interest.insert(Ready::WRITABLE);
1623                StateResult::Continue
1624            }
1625            // probably backend hup between keep alive request, change backend
1626            (true, true) => {
1627                trace!(
1628                    "{} Backend hanged up in between requests",
1629                    log_context!(self)
1630                );
1631                StateResult::CloseBackend
1632            }
1633            // the frontend already transmitted data so we can't redirect
1634            (false, true) => {
1635                error!(
1636                    "{}  Frontend transmitted data but the back closed",
1637                    log_context!(self)
1638                );
1639
1640                self.set_answer(DefaultAnswer::Answer503 {
1641                    message: "Backend closed after consuming part of the request".into(),
1642                });
1643
1644                self.backend_readiness.interest = Ready::EMPTY;
1645                StateResult::Continue
1646            }
1647        }
1648    }
1649
1650    /// The main session loop, processing all events triggered by mio since last time
1651    /// and proxying http traffic. The main flow can be summed up by:
1652    ///
1653    /// - if connecting an back has event:
1654    ///   - if backend hanged up, try again
1655    ///   - else, set as connected
1656    /// - while front or back has event:
1657    ///   - read request on front
1658    ///   - write request to back
1659    ///   - read response on back
1660    ///   - write response to front
1661    fn ready_inner(
1662        &mut self,
1663        session: Rc<RefCell<dyn crate::ProxySession>>,
1664        proxy: Rc<RefCell<dyn L7Proxy>>,
1665        metrics: &mut SessionMetrics,
1666    ) -> SessionResult {
1667        let mut counter = 0;
1668
1669        if self.backend_connection_status.is_connecting()
1670            && !self.backend_readiness.event.is_empty()
1671        {
1672            if self.backend_readiness.event.is_hup() && !self.test_backend_socket() {
1673                //retry connecting the backend
1674                error!(
1675                    "{} Error connecting to backend, trying again, attempt {}",
1676                    log_context!(self),
1677                    self.connection_attempts
1678                );
1679
1680                self.connection_attempts += 1;
1681                self.fail_backend_connection(metrics);
1682
1683                self.backend_connection_status =
1684                    BackendConnectionStatus::Connecting(Instant::now());
1685
1686                // trigger a backend reconnection
1687                self.close_backend(proxy.clone(), metrics);
1688
1689                let connection_result =
1690                    self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1691                if let Err(err) = &connection_result {
1692                    error!(
1693                        "{} Error connecting to backend: {}",
1694                        log_context!(self),
1695                        err
1696                    );
1697                }
1698
1699                if let Some(session_result) = handle_connection_result(connection_result) {
1700                    return session_result;
1701                }
1702            } else {
1703                metrics.backend_connected();
1704                self.connection_attempts = 0;
1705                self.set_backend_connected(BackendConnectionStatus::Connected, metrics);
1706                // we might get an early response from the backend, so we want to look
1707                // at readable events
1708                self.backend_readiness.interest.insert(Ready::READABLE);
1709            }
1710        }
1711
1712        if self.frontend_readiness.event.is_hup() {
1713            if !self.request_stream.is_initial() {
1714                self.log_request_error(metrics, "Client disconnected abruptly");
1715            }
1716            return SessionResult::Close;
1717        }
1718
1719        while counter < MAX_LOOP_ITERATIONS {
1720            let frontend_interest = self.frontend_readiness.filter_interest();
1721            let backend_interest = self.backend_readiness.filter_interest();
1722
1723            trace!(
1724                "{} Frontend interest({:?}) and backend interest({:?})",
1725                log_context!(self),
1726                frontend_interest,
1727                backend_interest,
1728            );
1729
1730            if frontend_interest.is_empty() && backend_interest.is_empty() {
1731                break;
1732            }
1733
1734            if self.backend_readiness.event.is_hup()
1735                && self.frontend_readiness.interest.is_writable()
1736                && !self.frontend_readiness.event.is_writable()
1737            {
1738                break;
1739            }
1740
1741            if frontend_interest.is_readable() {
1742                let state_result = self.readable(metrics);
1743                trace!(
1744                    "{} frontend_readable: {:?}",
1745                    log_context!(self),
1746                    state_result
1747                );
1748
1749                match state_result {
1750                    StateResult::Continue => {}
1751                    StateResult::ConnectBackend => {
1752                        let connection_result =
1753                            self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1754                        if let Err(err) = &connection_result {
1755                            error!(
1756                                "{} Error connecting to backend: {}",
1757                                log_context!(self),
1758                                err
1759                            );
1760                        }
1761
1762                        if let Some(session_result) = handle_connection_result(connection_result) {
1763                            return session_result;
1764                        }
1765                    }
1766                    StateResult::CloseBackend => unreachable!(),
1767                    StateResult::CloseSession => return SessionResult::Close,
1768                    StateResult::Upgrade => return SessionResult::Upgrade,
1769                }
1770            }
1771
1772            if backend_interest.is_writable() {
1773                let session_result = self.backend_writable(metrics);
1774                trace!(
1775                    "{} backend_writable: {:?}",
1776                    log_context!(self),
1777                    session_result
1778                );
1779                if session_result != SessionResult::Continue {
1780                    return session_result;
1781                }
1782            }
1783
1784            if backend_interest.is_readable() {
1785                let session_result = self.backend_readable(metrics);
1786                trace!(
1787                    "{} backend_readable: {:?}",
1788                    log_context!(self),
1789                    session_result
1790                );
1791                if session_result != SessionResult::Continue {
1792                    return session_result;
1793                }
1794            }
1795
1796            if frontend_interest.is_writable() {
1797                let state_result = self.writable(metrics);
1798                trace!(
1799                    "{} frontend_writable: {:?}",
1800                    log_context!(self),
1801                    state_result
1802                );
1803                match state_result {
1804                    StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
1805                    StateResult::CloseSession => return SessionResult::Close,
1806                    StateResult::Upgrade => return SessionResult::Upgrade,
1807                    StateResult::Continue => {}
1808                    StateResult::ConnectBackend => unreachable!(),
1809                }
1810            }
1811
1812            if frontend_interest.is_error() {
1813                error!(
1814                    "{} frontend socket error, disconnecting",
1815                    log_context!(self)
1816                );
1817
1818                return SessionResult::Close;
1819            }
1820
1821            if backend_interest.is_hup() || backend_interest.is_error() {
1822                let state_result = self.backend_hup(metrics);
1823
1824                trace!("{} backend_hup: {:?}", log_context!(self), state_result);
1825                match state_result {
1826                    StateResult::Continue => {}
1827                    StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
1828                    StateResult::CloseSession => return SessionResult::Close,
1829                    StateResult::ConnectBackend | StateResult::Upgrade => unreachable!(),
1830                }
1831            }
1832
1833            counter += 1;
1834        }
1835
1836        if counter >= MAX_LOOP_ITERATIONS {
1837            error!(
1838                "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
1839                log_context!(self),
1840                MAX_LOOP_ITERATIONS
1841            );
1842
1843            incr!("http.infinite_loop.error");
1844            self.print_state(self.protocol_string());
1845
1846            return SessionResult::Close;
1847        }
1848
1849        SessionResult::Continue
1850    }
1851
1852    pub fn timeout_status(&self) -> TimeoutStatus {
1853        if self.request_stream.is_main_phase() {
1854            match &self.response_stream {
1855                ResponseStream::BackendAnswer(kawa) if kawa.is_initial() => {
1856                    TimeoutStatus::WaitingForResponse
1857                }
1858                _ => TimeoutStatus::Response,
1859            }
1860        } else if self.keepalive_count > 0 {
1861            TimeoutStatus::WaitingForNewRequest
1862        } else {
1863            TimeoutStatus::Request
1864        }
1865    }
1866}
1867
1868impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState for Http<Front, L> {
1869    fn ready(
1870        &mut self,
1871        session: Rc<RefCell<dyn crate::ProxySession>>,
1872        proxy: Rc<RefCell<dyn L7Proxy>>,
1873        metrics: &mut SessionMetrics,
1874    ) -> SessionResult {
1875        let session_result = self.ready_inner(session, proxy, metrics);
1876        if session_result == SessionResult::Upgrade {
1877            let response_storage = match &mut self.response_stream {
1878                ResponseStream::BackendAnswer(response_stream) => &mut response_stream.storage,
1879                _ => return SessionResult::Close,
1880            };
1881
1882            // sync the underlying Checkout buffers, if they contain remaining data
1883            // it will be processed once upgraded to websocket
1884            self.request_stream.storage.buffer.sync(
1885                self.request_stream.storage.end,
1886                self.request_stream.storage.head,
1887            );
1888            response_storage
1889                .buffer
1890                .sync(response_storage.end, response_storage.head);
1891        }
1892        session_result
1893    }
1894
1895    fn update_readiness(&mut self, token: Token, events: Ready) {
1896        if self.frontend_token == token {
1897            self.frontend_readiness.event |= events;
1898        } else if self.backend_token == Some(token) {
1899            self.backend_readiness.event |= events;
1900        }
1901    }
1902
1903    fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1904        self.close_backend(proxy, metrics);
1905        self.frontend_socket.socket_close();
1906        let _ = self.frontend_socket.socket_write_vectored(&[]);
1907
1908        //if the state was initial, the connection was already reset
1909        if !self.request_stream.is_initial() {
1910            gauge_add!("http.active_requests", -1);
1911
1912            if let Some(b) = self.backend.as_mut() {
1913                let mut backend = b.borrow_mut();
1914                backend.active_requests = backend.active_requests.saturating_sub(1);
1915            }
1916        }
1917    }
1918
1919    fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
1920        //info!("got timeout for token: {:?}", token);
1921        if self.frontend_token == token {
1922            self.container_frontend_timeout.triggered();
1923            return match self.timeout_status() {
1924                // we do not have a complete answer
1925                TimeoutStatus::Request => {
1926                    self.set_answer(DefaultAnswer::Answer408 {
1927                        duration: self.container_frontend_timeout.to_string(),
1928                    });
1929                    self.writable(metrics)
1930                }
1931                // we have a complete answer but the response did not start
1932                TimeoutStatus::WaitingForResponse => {
1933                    // this case is ambiguous, as it is the frontend timeout that triggers while we were waiting for response
1934                    // the timeout responsibility should have switched before
1935                    self.set_answer(DefaultAnswer::Answer504 {
1936                        duration: self.container_backend_timeout.to_string(),
1937                    });
1938                    self.writable(metrics)
1939                }
1940                // we have a complete answer and the start of a response, but the request was not tagged as terminated
1941                // for now we place responsibility of timeout on the backend in those cases, so we ignore this
1942                TimeoutStatus::Response => StateResult::Continue,
1943                // timeout in keep-alive, simply close the connection
1944                TimeoutStatus::WaitingForNewRequest => StateResult::CloseSession,
1945            };
1946        }
1947
1948        if self.backend_token == Some(token) {
1949            //info!("backend timeout triggered for token {:?}", token);
1950            self.container_backend_timeout.triggered();
1951            return match self.timeout_status() {
1952                TimeoutStatus::Request => {
1953                    error!(
1954                        "got backend timeout while waiting for a request, this should not happen"
1955                    );
1956                    self.set_answer(DefaultAnswer::Answer504 {
1957                        duration: self.container_backend_timeout.to_string(),
1958                    });
1959                    self.writable(metrics)
1960                }
1961                TimeoutStatus::WaitingForResponse => {
1962                    self.set_answer(DefaultAnswer::Answer504 {
1963                        duration: self.container_backend_timeout.to_string(),
1964                    });
1965                    self.writable(metrics)
1966                }
1967                TimeoutStatus::Response => {
1968                    error!(
1969                        "backend {:?} timeout while receiving response (cluster {:?})",
1970                        self.context.backend_id, self.context.cluster_id
1971                    );
1972                    StateResult::CloseSession
1973                }
1974                // in keep-alive, we place responsibility of timeout on the frontend, so we ignore this
1975                TimeoutStatus::WaitingForNewRequest => StateResult::Continue,
1976            };
1977        }
1978
1979        error!("{} Got timeout for an invalid token", log_context!(self));
1980        StateResult::CloseSession
1981    }
1982
1983    fn cancel_timeouts(&mut self) {
1984        self.container_backend_timeout.cancel();
1985        self.container_frontend_timeout.cancel();
1986    }
1987
1988    fn print_state(&self, context: &str) {
1989        error!(
1990            "\
1991{} {} Session(Kawa)
1992\tFrontend:
1993\t\ttoken: {:?}\treadiness: {:?}\tstate: {:?}
1994\tBackend:
1995\t\ttoken: {:?}\treadiness: {:?}",
1996            log_context!(self),
1997            context,
1998            self.frontend_token,
1999            self.frontend_readiness,
2000            self.request_stream.parsing_phase,
2001            self.backend_token,
2002            self.backend_readiness,
2003            // self.response_stream.parsing_phase
2004        );
2005    }
2006
2007    fn shutting_down(&mut self) -> SessionIsToBeClosed {
2008        if self.request_stream.is_initial() && self.request_stream.storage.is_empty()
2009        // && self.response_stream.storage.is_empty()
2010        {
2011            true
2012        } else {
2013            self.context.closing = true;
2014            false
2015        }
2016    }
2017}
2018
2019fn handle_connection_result(
2020    connection_result: Result<BackendConnectAction, BackendConnectionError>,
2021) -> Option<SessionResult> {
2022    match connection_result {
2023        // reuse connection or send a default answer, we can continue
2024        Ok(BackendConnectAction::Reuse) => None,
2025        Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
2026            // we must wait for an event
2027            Some(SessionResult::Continue)
2028        }
2029        Err(_) => {
2030            // All BackendConnectionError already set a default answer
2031            // the session must continue to serve it
2032            // - NotFound: not used for http (only tcp)
2033            // - RetrieveClusterError: 301/400/401/404,
2034            // - MaxConnectionRetries: 503,
2035            // - Backend: 503,
2036            // - MaxSessionsMemory: not checked in connect_to_backend (TODO: check it?)
2037            None
2038        }
2039    }
2040}
2041
2042/// Save the HTTP status code of the backend response
2043fn save_http_status_metric(status: Option<u16>, context: LogContext) {
2044    if let Some(status) = status {
2045        match status {
2046            100..=199 => {
2047                incr!("http.status.1xx", context.cluster_id, context.backend_id);
2048            }
2049            200..=299 => {
2050                incr!("http.status.2xx", context.cluster_id, context.backend_id);
2051            }
2052            300..=399 => {
2053                incr!("http.status.3xx", context.cluster_id, context.backend_id);
2054            }
2055            400..=499 => {
2056                incr!("http.status.4xx", context.cluster_id, context.backend_id);
2057            }
2058            500..=599 => {
2059                incr!("http.status.5xx", context.cluster_id, context.backend_id);
2060            }
2061            _ => {
2062                // http responses with other codes (protocol error)
2063                incr!("http.status.other");
2064            }
2065        }
2066    }
2067}