sozu_lib/
tcp.rs

1use std::{
2    cell::RefCell,
3    collections::{hash_map::Entry, BTreeMap, HashMap},
4    io::ErrorKind,
5    net::{Shutdown, SocketAddr},
6    os::unix::io::AsRawFd,
7    rc::Rc,
8    time::{Duration, Instant},
9};
10
11use mio::{
12    net::TcpListener as MioTcpListener, net::TcpStream as MioTcpStream, unix::SourceFd, Interest,
13    Registry, Token,
14};
15use rusty_ulid::Ulid;
16
17use sozu_command::{
18    config::MAX_LOOP_ITERATIONS,
19    logging::{EndpointRecord, LogContext},
20    proto::command::request::RequestType,
21    ObjectKind,
22};
23
24use crate::{
25    backends::{Backend, BackendMap},
26    pool::{Checkout, Pool},
27    protocol::{
28        pipe::WebSocketContext,
29        proxy_protocol::{
30            expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
31        },
32        Pipe,
33    },
34    retry::RetryPolicy,
35    server::{push_event, ListenToken, SessionManager, CONN_RETRIES},
36    socket::{server_bind, stats::socket_rtt},
37    sozu_command::{
38        proto::command::{
39            Event, EventKind, ProxyProtocolConfig, RequestTcpFrontend, TcpListenerConfig,
40            WorkerRequest, WorkerResponse,
41        },
42        ready::Ready,
43        state::ClusterId,
44    },
45    timer::TimeoutContainer,
46    AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
47    ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
48    Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
49};
50
51StateMachineBuilder! {
52    /// The various Stages of a TCP connection:
53    ///
54    /// 1. optional (ExpectProxyProtocol | SendProxyProtocol | RelayProxyProtocol)
55    /// 2. Pipe
56    enum TcpStateMachine {
57        Pipe(Pipe<MioTcpStream, TcpListener>),
58        SendProxyProtocol(SendProxyProtocol<MioTcpStream>),
59        RelayProxyProtocol(RelayProxyProtocol<MioTcpStream>),
60        ExpectProxyProtocol(ExpectProxyProtocol<MioTcpStream>),
61    }
62}
63
64/// This macro is defined uniquely in this module to help the tracking of kawa h1
65/// issues inside Sōzu
66macro_rules! log_context {
67    ($self:expr) => {
68        format!(
69            "TCP\t{}\tSession(frontend={}, backend={})\t >>>",
70            $self.log_context(),
71            $self.frontend_token.0,
72            $self
73                .backend_token
74                .map(|token| token.0.to_string())
75                .unwrap_or_else(|| "<none>".to_string()),
76        )
77    };
78}
79
80pub struct TcpSession {
81    backend_buffer: Option<Checkout>,
82    backend_connected: BackendConnectionStatus,
83    backend_id: Option<String>,
84    backend_token: Option<Token>,
85    backend: Option<Rc<RefCell<Backend>>>,
86    cluster_id: Option<String>,
87    configured_backend_timeout: Duration,
88    connection_attempt: u8,
89    container_backend_timeout: TimeoutContainer,
90    container_frontend_timeout: TimeoutContainer,
91    frontend_address: Option<SocketAddr>,
92    frontend_buffer: Option<Checkout>,
93    frontend_token: Token,
94    has_been_closed: SessionIsToBeClosed,
95    last_event: Instant,
96    listener: Rc<RefCell<TcpListener>>,
97    metrics: SessionMetrics,
98    proxy: Rc<RefCell<TcpProxy>>,
99    request_id: Ulid,
100    state: TcpStateMachine,
101}
102
103impl TcpSession {
104    #[allow(clippy::too_many_arguments)]
105    fn new(
106        backend_buffer: Checkout,
107        backend_id: Option<String>,
108        cluster_id: Option<String>,
109        configured_backend_timeout: Duration,
110        configured_connect_timeout: Duration,
111        configured_frontend_timeout: Duration,
112        frontend_buffer: Checkout,
113        frontend_token: Token,
114        listener: Rc<RefCell<TcpListener>>,
115        proxy_protocol: Option<ProxyProtocolConfig>,
116        proxy: Rc<RefCell<TcpProxy>>,
117        socket: MioTcpStream,
118        wait_time: Duration,
119    ) -> TcpSession {
120        let frontend_address = socket.peer_addr().ok();
121        let mut frontend_buffer_session = None;
122        let mut backend_buffer_session = None;
123
124        let request_id = Ulid::generate();
125
126        let container_frontend_timeout =
127            TimeoutContainer::new(configured_frontend_timeout, frontend_token);
128        let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout);
129
130        let state = match proxy_protocol {
131            Some(ProxyProtocolConfig::RelayHeader) => {
132                backend_buffer_session = Some(backend_buffer);
133                gauge_add!("protocol.proxy.relay", 1);
134                TcpStateMachine::RelayProxyProtocol(RelayProxyProtocol::new(
135                    socket,
136                    frontend_token,
137                    request_id,
138                    None,
139                    frontend_buffer,
140                ))
141            }
142            Some(ProxyProtocolConfig::ExpectHeader) => {
143                frontend_buffer_session = Some(frontend_buffer);
144                backend_buffer_session = Some(backend_buffer);
145                gauge_add!("protocol.proxy.expect", 1);
146                TcpStateMachine::ExpectProxyProtocol(ExpectProxyProtocol::new(
147                    container_frontend_timeout.clone(),
148                    socket,
149                    frontend_token,
150                    request_id,
151                ))
152            }
153            Some(ProxyProtocolConfig::SendHeader) => {
154                frontend_buffer_session = Some(frontend_buffer);
155                backend_buffer_session = Some(backend_buffer);
156                gauge_add!("protocol.proxy.send", 1);
157                TcpStateMachine::SendProxyProtocol(SendProxyProtocol::new(
158                    socket,
159                    frontend_token,
160                    request_id,
161                    None,
162                ))
163            }
164            None => {
165                gauge_add!("protocol.tcp", 1);
166                let mut pipe = Pipe::new(
167                    backend_buffer,
168                    backend_id.clone(),
169                    None,
170                    None,
171                    None,
172                    None,
173                    cluster_id.clone(),
174                    frontend_buffer,
175                    frontend_token,
176                    socket,
177                    listener.clone(),
178                    Protocol::TCP,
179                    request_id,
180                    frontend_address,
181                    WebSocketContext::Tcp,
182                );
183                pipe.set_cluster_id(cluster_id.clone());
184                TcpStateMachine::Pipe(pipe)
185            }
186        };
187
188        let metrics = SessionMetrics::new(Some(wait_time));
189        //FIXME: timeout usage
190
191        TcpSession {
192            backend_buffer: backend_buffer_session,
193            backend_connected: BackendConnectionStatus::NotConnected,
194            backend_id,
195            backend_token: None,
196            backend: None,
197            cluster_id,
198            configured_backend_timeout,
199            connection_attempt: 0,
200            container_backend_timeout,
201            container_frontend_timeout,
202            frontend_address,
203            frontend_buffer: frontend_buffer_session,
204            frontend_token,
205            has_been_closed: false,
206            last_event: Instant::now(),
207            listener,
208            metrics,
209            proxy,
210            request_id,
211            state,
212        }
213    }
214
215    fn log_request(&self) {
216        let listener = self.listener.borrow();
217        let context = self.log_context();
218        self.metrics.register_end_of_session(&context);
219        info_access!(
220            on_failure: { incr!("unsent-access-logs") },
221            message: None,
222            context,
223            session_address: self.frontend_address,
224            backend_address: None,
225            protocol: "TCP",
226            endpoint: EndpointRecord::Tcp,
227            tags: listener.get_tags(&listener.get_addr().to_string()),
228            client_rtt: socket_rtt(self.state.front_socket()),
229            server_rtt: None,
230            user_agent: None,
231            service_time: self.metrics.service_time(),
232            response_time: self.metrics.backend_response_time(),
233            request_time: self.metrics.request_time(),
234            bytes_in: self.metrics.bin,
235            bytes_out: self.metrics.bout
236        );
237    }
238
239    fn front_hup(&mut self) -> SessionResult {
240        match &mut self.state {
241            TcpStateMachine::Pipe(pipe) => pipe.frontend_hup(&mut self.metrics),
242            _ => {
243                self.log_request();
244                SessionResult::Close
245            }
246        }
247    }
248
249    fn back_hup(&mut self) -> SessionResult {
250        match &mut self.state {
251            TcpStateMachine::Pipe(pipe) => pipe.backend_hup(&mut self.metrics),
252            _ => {
253                self.log_request();
254                SessionResult::Close
255            }
256        }
257    }
258
259    fn log_context(&self) -> LogContext {
260        LogContext {
261            request_id: self.request_id,
262            cluster_id: self.cluster_id.as_deref(),
263            backend_id: self.backend_id.as_deref(),
264        }
265    }
266
267    fn readable(&mut self) -> SessionResult {
268        if !self.container_frontend_timeout.reset() {
269            error!(
270                "{} Could not reset frontend timeout on readable",
271                log_context!(self)
272            );
273        }
274        if self.backend_connected == BackendConnectionStatus::Connected
275            && !self.container_backend_timeout.reset()
276        {
277            error!(
278                "{} Could not reset backend timeout on readable",
279                log_context!(self)
280            );
281        }
282        match &mut self.state {
283            TcpStateMachine::Pipe(pipe) => pipe.readable(&mut self.metrics),
284            TcpStateMachine::RelayProxyProtocol(pp) => pp.readable(&mut self.metrics),
285            TcpStateMachine::ExpectProxyProtocol(pp) => pp.readable(&mut self.metrics),
286            TcpStateMachine::SendProxyProtocol(_) => SessionResult::Continue,
287            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
288        }
289    }
290
291    fn writable(&mut self) -> SessionResult {
292        match &mut self.state {
293            TcpStateMachine::Pipe(pipe) => pipe.writable(&mut self.metrics),
294            _ => SessionResult::Continue,
295        }
296    }
297
298    fn back_readable(&mut self) -> SessionResult {
299        if !self.container_frontend_timeout.reset() {
300            error!(
301                "{} Could not reset frontend timeout on back_readable",
302                log_context!(self)
303            );
304        }
305        if !self.container_backend_timeout.reset() {
306            error!(
307                "{} Could not reset backend timeout on back_readable",
308                log_context!(self)
309            );
310        }
311
312        match &mut self.state {
313            TcpStateMachine::Pipe(pipe) => pipe.backend_readable(&mut self.metrics),
314            _ => SessionResult::Continue,
315        }
316    }
317
318    fn back_writable(&mut self) -> SessionResult {
319        match &mut self.state {
320            TcpStateMachine::Pipe(pipe) => pipe.backend_writable(&mut self.metrics),
321            TcpStateMachine::RelayProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
322            TcpStateMachine::SendProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
323            TcpStateMachine::ExpectProxyProtocol(_) => SessionResult::Continue,
324            TcpStateMachine::FailedUpgrade(_) => {
325                unreachable!()
326            }
327        }
328    }
329
330    fn back_socket_mut(&mut self) -> Option<&mut MioTcpStream> {
331        match &mut self.state {
332            TcpStateMachine::Pipe(pipe) => pipe.back_socket_mut(),
333            TcpStateMachine::SendProxyProtocol(pp) => pp.back_socket_mut(),
334            TcpStateMachine::RelayProxyProtocol(pp) => pp.back_socket_mut(),
335            TcpStateMachine::ExpectProxyProtocol(_) => None,
336            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
337        }
338    }
339
340    pub fn upgrade(&mut self) -> SessionIsToBeClosed {
341        let new_state = match self.state.take() {
342            TcpStateMachine::SendProxyProtocol(spp) => self.upgrade_send(spp),
343            TcpStateMachine::RelayProxyProtocol(rpp) => self.upgrade_relay(rpp),
344            TcpStateMachine::ExpectProxyProtocol(epp) => self.upgrade_expect(epp),
345            TcpStateMachine::Pipe(_) => None,
346            TcpStateMachine::FailedUpgrade(_) => todo!(),
347        };
348
349        match new_state {
350            Some(state) => {
351                self.state = state;
352                false
353            } // The state stays FailedUpgrade, but the Session should be closed right after
354
355            None => true,
356        }
357    }
358
359    fn upgrade_send(
360        &mut self,
361        send_proxy_protocol: SendProxyProtocol<MioTcpStream>,
362    ) -> Option<TcpStateMachine> {
363        if self.backend_buffer.is_some() && self.frontend_buffer.is_some() {
364            let mut pipe = send_proxy_protocol.into_pipe(
365                self.frontend_buffer.take().unwrap(),
366                self.backend_buffer.take().unwrap(),
367                self.listener.clone(),
368            );
369
370            pipe.set_cluster_id(self.cluster_id.clone());
371            gauge_add!("protocol.proxy.send", -1);
372            gauge_add!("protocol.tcp", 1);
373            return Some(TcpStateMachine::Pipe(pipe));
374        }
375
376        error!(
377            "{} Missing the frontend or backend buffer queue, we can't switch to a pipe",
378            log_context!(self)
379        );
380        None
381    }
382
383    fn upgrade_relay(&mut self, rpp: RelayProxyProtocol<MioTcpStream>) -> Option<TcpStateMachine> {
384        if self.backend_buffer.is_some() {
385            let mut pipe =
386                rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone());
387            pipe.set_cluster_id(self.cluster_id.clone());
388            gauge_add!("protocol.proxy.relay", -1);
389            gauge_add!("protocol.tcp", 1);
390            return Some(TcpStateMachine::Pipe(pipe));
391        }
392
393        error!(
394            "{} Missing the backend buffer queue, we can't switch to a pipe",
395            log_context!(self)
396        );
397        None
398    }
399
400    fn upgrade_expect(
401        &mut self,
402        epp: ExpectProxyProtocol<MioTcpStream>,
403    ) -> Option<TcpStateMachine> {
404        if self.frontend_buffer.is_some() && self.backend_buffer.is_some() {
405            let mut pipe = epp.into_pipe(
406                self.frontend_buffer.take().unwrap(),
407                self.backend_buffer.take().unwrap(),
408                None,
409                None,
410                self.listener.clone(),
411            );
412
413            pipe.set_cluster_id(self.cluster_id.clone());
414            gauge_add!("protocol.proxy.expect", -1);
415            gauge_add!("protocol.tcp", 1);
416            return Some(TcpStateMachine::Pipe(pipe));
417        }
418
419        error!(
420            "{} Missing the backend buffer queue, we can't switch to a pipe",
421            log_context!(self)
422        );
423        None
424    }
425
426    fn front_readiness(&mut self) -> &mut Readiness {
427        match &mut self.state {
428            TcpStateMachine::Pipe(pipe) => &mut pipe.frontend_readiness,
429            TcpStateMachine::SendProxyProtocol(pp) => &mut pp.frontend_readiness,
430            TcpStateMachine::RelayProxyProtocol(pp) => &mut pp.frontend_readiness,
431            TcpStateMachine::ExpectProxyProtocol(pp) => &mut pp.frontend_readiness,
432            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
433        }
434    }
435
436    fn back_readiness(&mut self) -> Option<&mut Readiness> {
437        match &mut self.state {
438            TcpStateMachine::Pipe(pipe) => Some(&mut pipe.backend_readiness),
439            TcpStateMachine::SendProxyProtocol(pp) => Some(&mut pp.backend_readiness),
440            TcpStateMachine::RelayProxyProtocol(pp) => Some(&mut pp.backend_readiness),
441            TcpStateMachine::ExpectProxyProtocol(_) => None,
442            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
443        }
444    }
445
446    fn set_back_socket(&mut self, socket: MioTcpStream) {
447        match &mut self.state {
448            TcpStateMachine::Pipe(pipe) => pipe.set_back_socket(socket),
449            TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_socket(socket),
450            TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_socket(socket),
451            TcpStateMachine::ExpectProxyProtocol(_) => {
452                error!(
453                    "{} We should not set the back socket for the expect proxy protocol",
454                    log_context!(self)
455                );
456                panic!(
457                    "{} We should not set the back socket for the expect proxy protocol",
458                    log_context!(self)
459                );
460            }
461            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
462        }
463    }
464
465    fn set_back_token(&mut self, token: Token) {
466        self.backend_token = Some(token);
467
468        match &mut self.state {
469            TcpStateMachine::Pipe(pipe) => pipe.set_back_token(token),
470            TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_token(token),
471            TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_token(token),
472            TcpStateMachine::ExpectProxyProtocol(_) => self.backend_token = Some(token),
473            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
474        }
475    }
476
477    fn set_backend_id(&mut self, id: String) {
478        self.backend_id = Some(id.clone());
479        if let TcpStateMachine::Pipe(pipe) = &mut self.state {
480            pipe.set_backend_id(Some(id));
481        }
482    }
483
484    fn back_connected(&self) -> BackendConnectionStatus {
485        self.backend_connected
486    }
487
488    fn set_back_connected(&mut self, status: BackendConnectionStatus) {
489        let last = self.backend_connected;
490        self.backend_connected = status;
491
492        if status == BackendConnectionStatus::Connected {
493            gauge_add!("backend.connections", 1);
494            gauge_add!(
495                "connections_per_backend",
496                1,
497                self.cluster_id.as_deref(),
498                self.metrics.backend_id.as_deref()
499            );
500
501            // the back timeout was of connect_timeout duration before,
502            // now that we're connected, move to backend_timeout duration
503            self.container_backend_timeout
504                .set_duration(self.configured_backend_timeout);
505            self.container_frontend_timeout.reset();
506
507            if let TcpStateMachine::SendProxyProtocol(spp) = &mut self.state {
508                spp.set_back_connected(BackendConnectionStatus::Connected);
509            }
510
511            if let Some(backend) = self.backend.as_ref() {
512                let mut backend = backend.borrow_mut();
513
514                if backend.retry_policy.is_down() {
515                    incr!(
516                        "backend.up",
517                        self.cluster_id.as_deref(),
518                        self.metrics.backend_id.as_deref()
519                    );
520                    info!(
521                        "backend server {} at {} is up",
522                        backend.backend_id, backend.address
523                    );
524                    push_event(Event {
525                        kind: EventKind::BackendUp as i32,
526                        backend_id: Some(backend.backend_id.to_owned()),
527                        address: Some(backend.address.into()),
528                        cluster_id: None,
529                    });
530                }
531
532                if let BackendConnectionStatus::Connecting(start) = last {
533                    backend.set_connection_time(Instant::now() - start);
534                }
535
536                //successful connection, rest failure counter
537                backend.failures = 0;
538                backend.retry_policy.succeed();
539            }
540        }
541    }
542
543    fn remove_backend(&mut self) {
544        if let Some(backend) = self.backend.take() {
545            (*backend.borrow_mut()).dec_connections();
546        }
547
548        self.backend_token = None;
549    }
550
551    fn fail_backend_connection(&mut self) {
552        if let Some(backend) = self.backend.as_ref() {
553            let backend = &mut *backend.borrow_mut();
554            backend.failures += 1;
555
556            let already_unavailable = backend.retry_policy.is_down();
557            backend.retry_policy.fail();
558            incr!(
559                "backend.connections.error",
560                self.cluster_id.as_deref(),
561                self.metrics.backend_id.as_deref()
562            );
563            if !already_unavailable && backend.retry_policy.is_down() {
564                error!(
565                    "backend server {} at {} is down",
566                    backend.backend_id, backend.address
567                );
568                incr!(
569                    "backend.down",
570                    self.cluster_id.as_deref(),
571                    self.metrics.backend_id.as_deref()
572                );
573
574                push_event(Event {
575                    kind: EventKind::BackendDown as i32,
576                    backend_id: Some(backend.backend_id.to_owned()),
577                    address: Some(backend.address.into()),
578                    cluster_id: None,
579                });
580            }
581        }
582    }
583
584    pub fn test_back_socket(&mut self) -> SessionIsToBeClosed {
585        match self.back_socket_mut() {
586            Some(ref mut s) => {
587                let mut tmp = [0u8; 1];
588                let res = s.peek(&mut tmp[..]);
589
590                match res {
591                    // if the socket is half open, it will report 0 bytes read (EOF)
592                    Ok(0) => false,
593                    Ok(_) => true,
594                    Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
595                }
596            }
597            None => false,
598        }
599    }
600
601    pub fn cancel_timeouts(&mut self) {
602        self.container_frontend_timeout.cancel();
603        self.container_backend_timeout.cancel();
604    }
605
606    fn ready_inner(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionResult {
607        let mut counter = 0;
608
609        let back_connected = self.back_connected();
610        if back_connected.is_connecting() {
611            if self.back_readiness().unwrap().event.is_hup() && !self.test_back_socket() {
612                //retry connecting the backend
613                debug!("error connecting to backend, trying again");
614                self.connection_attempt += 1;
615                self.fail_backend_connection();
616
617                // trigger a backend reconnection
618                self.close_backend();
619                let connection_result = self.connect_to_backend(session.clone());
620                if let Err(err) = &connection_result {
621                    error!(
622                        "{} Error connecting to backend: {}",
623                        log_context!(self),
624                        err
625                    );
626                }
627
628                if let Some(state_result) = handle_connection_result(connection_result) {
629                    return state_result;
630                }
631            } else if self.back_readiness().unwrap().event != Ready::EMPTY {
632                self.connection_attempt = 0;
633                self.set_back_connected(BackendConnectionStatus::Connected);
634            }
635        } else if back_connected == BackendConnectionStatus::NotConnected {
636            let connection_result = self.connect_to_backend(session.clone());
637            if let Err(err) = &connection_result {
638                error!(
639                    "{} Error connecting to backend: {}",
640                    log_context!(self),
641                    err
642                );
643            }
644
645            if let Some(state_result) = handle_connection_result(connection_result) {
646                return state_result;
647            }
648        }
649
650        if self.front_readiness().event.is_hup() {
651            let session_result = self.front_hup();
652            if session_result == SessionResult::Continue {
653                self.front_readiness().event.remove(Ready::HUP);
654            }
655            return session_result;
656        }
657
658        while counter < MAX_LOOP_ITERATIONS {
659            let front_interest = self.front_readiness().interest & self.front_readiness().event;
660            let back_interest = self
661                .back_readiness()
662                .map(|r| r.interest & r.event)
663                .unwrap_or(Ready::EMPTY);
664
665            trace!(
666                "{} Frontend interest({:?}) and backend interest({:?})",
667                log_context!(self),
668                front_interest,
669                back_interest
670            );
671
672            if front_interest == Ready::EMPTY && back_interest == Ready::EMPTY {
673                break;
674            }
675
676            if self
677                .back_readiness()
678                .map(|r| r.event.is_hup())
679                .unwrap_or(false)
680                && self.front_readiness().interest.is_writable()
681                && !self.front_readiness().event.is_writable()
682            {
683                break;
684            }
685
686            if front_interest.is_readable() {
687                let session_result = self.readable();
688                if session_result != SessionResult::Continue {
689                    return session_result;
690                }
691            }
692
693            if back_interest.is_writable() {
694                let session_result = self.back_writable();
695                if session_result != SessionResult::Continue {
696                    return session_result;
697                }
698            }
699
700            if back_interest.is_readable() {
701                let session_result = self.back_readable();
702                if session_result != SessionResult::Continue {
703                    return session_result;
704                }
705            }
706
707            if front_interest.is_writable() {
708                let session_result = self.writable();
709                if session_result != SessionResult::Continue {
710                    return session_result;
711                }
712            }
713
714            if back_interest.is_hup() {
715                let session_result = self.back_hup();
716                if session_result != SessionResult::Continue {
717                    return session_result;
718                }
719            }
720
721            if front_interest.is_error() {
722                error!(
723                    "{} Frontend socket error, disconnecting",
724                    log_context!(self)
725                );
726                self.front_readiness().interest = Ready::EMPTY;
727                if let Some(r) = self.back_readiness() {
728                    r.interest = Ready::EMPTY;
729                }
730
731                return SessionResult::Close;
732            }
733
734            if back_interest.is_error() && self.back_hup() == SessionResult::Close {
735                self.front_readiness().interest = Ready::EMPTY;
736                if let Some(r) = self.back_readiness() {
737                    r.interest = Ready::EMPTY;
738                }
739
740                error!("{} backend socket error, disconnecting", log_context!(self));
741                return SessionResult::Close;
742            }
743
744            counter += 1;
745        }
746
747        if counter >= MAX_LOOP_ITERATIONS {
748            error!(
749                "{} Handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
750                log_context!(self), MAX_LOOP_ITERATIONS
751            );
752
753            incr!("tcp.infinite_loop.error");
754
755            let front_interest = self.front_readiness().interest & self.front_readiness().event;
756            let back_interest = self
757                .back_readiness()
758                .map(|r| r.interest & r.event)
759                .unwrap_or(Ready::EMPTY);
760
761            let back = self.back_readiness().cloned();
762
763            error!(
764                "{} readiness: front {:?} / back {:?} | front: {:?} | back: {:?} ",
765                log_context!(self),
766                self.front_readiness(),
767                back,
768                front_interest,
769                back_interest
770            );
771
772            self.print_session();
773
774            return SessionResult::Close;
775        }
776
777        SessionResult::Continue
778    }
779
780    /// TCP session closes its backend on its own, without defering this task to the state
781    fn close_backend(&mut self) {
782        if let (Some(token), Some(fd)) = (
783            self.backend_token,
784            self.back_socket_mut().map(|s| s.as_raw_fd()),
785        ) {
786            let proxy = self.proxy.borrow();
787            if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
788                error!(
789                    "{} Error deregistering socket({:?}): {:?}",
790                    log_context!(self),
791                    fd,
792                    e
793                );
794            }
795
796            proxy.sessions.borrow_mut().slab.try_remove(token.0);
797        }
798        self.remove_backend();
799
800        let back_connected = self.back_connected();
801        if back_connected != BackendConnectionStatus::NotConnected {
802            if let Some(r) = self.back_readiness() {
803                r.event = Ready::EMPTY;
804            }
805
806            let log_context = log_context!(self);
807            if let Some(sock) = self.back_socket_mut() {
808                if let Err(e) = sock.shutdown(Shutdown::Both) {
809                    if e.kind() != ErrorKind::NotConnected {
810                        error!(
811                            "{} Error closing back socket({:?}): {:?}",
812                            log_context, sock, e
813                        );
814                    }
815                }
816            }
817        }
818
819        if back_connected == BackendConnectionStatus::Connected {
820            gauge_add!("backend.connections", -1);
821            gauge_add!(
822                "connections_per_backend",
823                -1,
824                self.cluster_id.as_deref(),
825                self.metrics.backend_id.as_deref()
826            );
827        }
828
829        self.set_back_connected(BackendConnectionStatus::NotConnected);
830    }
831
832    fn connect_to_backend(
833        &mut self,
834        session_rc: Rc<RefCell<dyn ProxySession>>,
835    ) -> Result<BackendConnectAction, BackendConnectionError> {
836        let cluster_id = self
837            .listener
838            .borrow()
839            .cluster_id
840            .clone()
841            .ok_or(BackendConnectionError::NotFound(ObjectKind::TcpCluster))?;
842
843        self.cluster_id = Some(cluster_id.clone());
844
845        if self.connection_attempt >= CONN_RETRIES {
846            error!(
847                "{} Max connection attempt reached ({})",
848                log_context!(self),
849                self.connection_attempt
850            );
851            return Err(BackendConnectionError::MaxConnectionRetries(Some(
852                cluster_id,
853            )));
854        }
855
856        if self.proxy.borrow().sessions.borrow().at_capacity() {
857            return Err(BackendConnectionError::MaxSessionsMemory);
858        }
859
860        let (backend, mut stream) = self
861            .proxy
862            .borrow()
863            .backends
864            .borrow_mut()
865            .backend_from_cluster_id(&cluster_id)
866            .map_err(BackendConnectionError::Backend)?;
867
868        if let Err(e) = stream.set_nodelay(true) {
869            error!(
870                "{} Error setting nodelay on back socket({:?}): {:?}",
871                log_context!(self),
872                stream,
873                e
874            );
875        }
876        self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());
877
878        let back_token = {
879            let proxy = self.proxy.borrow();
880            let mut s = proxy.sessions.borrow_mut();
881            let entry = s.slab.vacant_entry();
882            let back_token = Token(entry.key());
883            let _entry = entry.insert(session_rc.clone());
884            back_token
885        };
886
887        if let Err(e) = self.proxy.borrow().registry.register(
888            &mut stream,
889            back_token,
890            Interest::READABLE | Interest::WRITABLE,
891        ) {
892            error!(
893                "{} Error registering back socket({:?}): {:?}",
894                log_context!(self),
895                stream,
896                e
897            );
898        }
899
900        self.container_backend_timeout.set(back_token);
901
902        self.set_back_token(back_token);
903        self.set_back_socket(stream);
904
905        self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
906        self.metrics.backend_start();
907        self.set_backend_id(backend.borrow().backend_id.clone());
908
909        Ok(BackendConnectAction::New)
910    }
911}
912
913impl ProxySession for TcpSession {
914    fn close(&mut self) {
915        if self.has_been_closed {
916            return;
917        }
918
919        // TODO: the state should handle the timeouts
920        trace!("{} Closing TCP session", log_context!(self));
921        self.metrics.service_stop();
922
923        // Restore gauges
924        match self.state.marker() {
925            StateMarker::Pipe => gauge_add!("protocol.tcp", -1),
926            StateMarker::SendProxyProtocol => gauge_add!("protocol.proxy.send", -1),
927            StateMarker::RelayProxyProtocol => gauge_add!("protocol.proxy.relay", -1),
928            StateMarker::ExpectProxyProtocol => gauge_add!("protocol.proxy.expect", -1),
929        }
930
931        if self.state.failed() {
932            match self.state.marker() {
933                StateMarker::Pipe => incr!("tcp.upgrade.pipe.failed"),
934                StateMarker::SendProxyProtocol => incr!("tcp.upgrade.send.failed"),
935                StateMarker::RelayProxyProtocol => incr!("tcp.upgrade.relay.failed"),
936                StateMarker::ExpectProxyProtocol => incr!("tcp.upgrade.expect.failed"),
937            }
938            return;
939        }
940
941        self.cancel_timeouts();
942
943        let front_socket = self.state.front_socket();
944        if let Err(e) = front_socket.shutdown(Shutdown::Both) {
945            // error 107 NotConnected can happen when was never fully connected, or was already disconnected due to error
946            if e.kind() != ErrorKind::NotConnected {
947                error!(
948                    "{} Error shutting down front socket({:?}): {:?}",
949                    log_context!(self),
950                    front_socket,
951                    e
952                );
953            }
954        }
955
956        // deregister the frontend and remove it, in a separate scope to drop proxy when done
957        {
958            let proxy = self.proxy.borrow();
959            let fd = front_socket.as_raw_fd();
960            if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
961                error!(
962                    "{} Error deregistering front socket({:?}) while closing TCP session: {:?}",
963                    log_context!(self),
964                    fd,
965                    e
966                );
967            }
968            proxy
969                .sessions
970                .borrow_mut()
971                .slab
972                .try_remove(self.frontend_token.0);
973        }
974
975        self.close_backend();
976        self.has_been_closed = true;
977    }
978
979    fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
980        if self.frontend_token == token {
981            self.container_frontend_timeout.triggered();
982            return true;
983        }
984        if self.backend_token == Some(token) {
985            self.container_backend_timeout.triggered();
986            return true;
987        }
988        // invalid token, obsolete timeout triggered
989        false
990    }
991
992    fn protocol(&self) -> Protocol {
993        Protocol::TCP
994    }
995
996    fn update_readiness(&mut self, token: Token, events: Ready) {
997        trace!(
998            "{} token {:?} got event {}",
999            log_context!(self),
1000            token,
1001            super::ready_to_string(events)
1002        );
1003
1004        self.last_event = Instant::now();
1005        self.metrics.wait_start();
1006
1007        if self.frontend_token == token {
1008            self.front_readiness().event = self.front_readiness().event | events;
1009        } else if self.backend_token == Some(token) {
1010            if let Some(r) = self.back_readiness() {
1011                r.event |= events;
1012            }
1013        }
1014    }
1015
1016    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1017        self.metrics.service_start();
1018
1019        let session_result = self.ready_inner(session.clone());
1020
1021        let to_bo_closed = match session_result {
1022            SessionResult::Close => true,
1023            SessionResult::Continue => false,
1024            SessionResult::Upgrade => match self.upgrade() {
1025                false => self.ready(session),
1026                true => true,
1027            },
1028        };
1029
1030        self.metrics.service_stop();
1031        to_bo_closed
1032    }
1033
1034    fn shutting_down(&mut self) -> SessionIsToBeClosed {
1035        true
1036    }
1037
1038    fn last_event(&self) -> Instant {
1039        self.last_event
1040    }
1041
1042    fn print_session(&self) {
1043        let state: String = match &self.state {
1044            TcpStateMachine::ExpectProxyProtocol(_) => String::from("Expect"),
1045            TcpStateMachine::SendProxyProtocol(_) => String::from("Send"),
1046            TcpStateMachine::RelayProxyProtocol(_) => String::from("Relay"),
1047            TcpStateMachine::Pipe(_) => String::from("TCP"),
1048            TcpStateMachine::FailedUpgrade(marker) => format!("FailedUpgrade({marker:?})"),
1049        };
1050
1051        let front_readiness = match &self.state {
1052            TcpStateMachine::ExpectProxyProtocol(expect) => Some(&expect.frontend_readiness),
1053            TcpStateMachine::SendProxyProtocol(send) => Some(&send.frontend_readiness),
1054            TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.frontend_readiness),
1055            TcpStateMachine::Pipe(pipe) => Some(&pipe.frontend_readiness),
1056            TcpStateMachine::FailedUpgrade(_) => None,
1057        };
1058
1059        let back_readiness = match &self.state {
1060            TcpStateMachine::SendProxyProtocol(send) => Some(&send.backend_readiness),
1061            TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.backend_readiness),
1062            TcpStateMachine::Pipe(pipe) => Some(&pipe.backend_readiness),
1063            TcpStateMachine::ExpectProxyProtocol(_) => None,
1064            TcpStateMachine::FailedUpgrade(_) => None,
1065        };
1066
1067        error!(
1068            "\
1069{} Session ({:?})
1070\tFrontend:
1071\t\ttoken: {:?}\treadiness: {:?}
1072\tBackend:
1073\t\ttoken: {:?}\treadiness: {:?}\tstatus: {:?}\tcluster id: {:?}",
1074            log_context!(self),
1075            state,
1076            self.frontend_token,
1077            front_readiness,
1078            self.backend_token,
1079            back_readiness,
1080            self.backend_connected,
1081            self.cluster_id
1082        );
1083        error!("Metrics: {:?}", self.metrics);
1084    }
1085
1086    fn frontend_token(&self) -> Token {
1087        self.frontend_token
1088    }
1089}
1090
1091pub struct TcpListener {
1092    active: SessionIsToBeClosed,
1093    address: SocketAddr,
1094    cluster_id: Option<String>,
1095    config: TcpListenerConfig,
1096    listener: Option<MioTcpListener>,
1097    tags: BTreeMap<String, CachedTags>,
1098    token: Token,
1099}
1100
1101impl ListenerHandler for TcpListener {
1102    fn get_addr(&self) -> &SocketAddr {
1103        &self.address
1104    }
1105
1106    fn get_tags(&self, key: &str) -> Option<&CachedTags> {
1107        self.tags.get(key)
1108    }
1109
1110    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
1111        match tags {
1112            Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
1113            None => self.tags.remove(&key),
1114        };
1115    }
1116}
1117
1118impl TcpListener {
1119    fn new(config: TcpListenerConfig, token: Token) -> Result<TcpListener, ListenerError> {
1120        Ok(TcpListener {
1121            cluster_id: None,
1122            listener: None,
1123            token,
1124            address: config.address.into(),
1125            config,
1126            active: false,
1127            tags: BTreeMap::new(),
1128        })
1129    }
1130
1131    pub fn activate(
1132        &mut self,
1133        registry: &Registry,
1134        tcp_listener: Option<MioTcpListener>,
1135    ) -> Result<Token, ProxyError> {
1136        if self.active {
1137            return Ok(self.token);
1138        }
1139
1140        let mut listener = match tcp_listener {
1141            Some(listener) => listener,
1142            None => {
1143                let address = self.config.address.into();
1144                server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
1145            }
1146        };
1147
1148        registry
1149            .register(&mut listener, self.token, Interest::READABLE)
1150            .map_err(ProxyError::RegisterListener)?;
1151
1152        self.listener = Some(listener);
1153        self.active = true;
1154        Ok(self.token)
1155    }
1156}
1157
1158fn handle_connection_result(
1159    connection_result: Result<BackendConnectAction, BackendConnectionError>,
1160) -> Option<SessionResult> {
1161    match connection_result {
1162        // reuse connection or send a default answer, we can continue
1163        Ok(BackendConnectAction::Reuse) => None,
1164        Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
1165            // we must wait for an event
1166            Some(SessionResult::Continue)
1167        }
1168        Err(_) => {
1169            // in case of BackendConnectionError::Backend(BackendError::ConnectionFailures(..))
1170            // we may want to retry instead of closing
1171            Some(SessionResult::Close)
1172        }
1173    }
1174}
1175
1176#[derive(Debug)]
1177pub struct ClusterConfiguration {
1178    proxy_protocol: Option<ProxyProtocolConfig>,
1179    // Uncomment this when implementing new load balancing algorithms
1180    // load_balancing: LoadBalancingAlgorithms,
1181}
1182
1183pub struct TcpProxy {
1184    fronts: HashMap<String, Token>,
1185    backends: Rc<RefCell<BackendMap>>,
1186    listeners: HashMap<Token, Rc<RefCell<TcpListener>>>,
1187    configs: HashMap<ClusterId, ClusterConfiguration>,
1188    registry: Registry,
1189    sessions: Rc<RefCell<SessionManager>>,
1190    pool: Rc<RefCell<Pool>>,
1191}
1192
1193impl TcpProxy {
1194    pub fn new(
1195        registry: Registry,
1196        sessions: Rc<RefCell<SessionManager>>,
1197        pool: Rc<RefCell<Pool>>,
1198        backends: Rc<RefCell<BackendMap>>,
1199    ) -> TcpProxy {
1200        TcpProxy {
1201            backends,
1202            listeners: HashMap::new(),
1203            configs: HashMap::new(),
1204            fronts: HashMap::new(),
1205            registry,
1206            sessions,
1207            pool,
1208        }
1209    }
1210
1211    pub fn add_listener(
1212        &mut self,
1213        config: TcpListenerConfig,
1214        token: Token,
1215    ) -> Result<Token, ProxyError> {
1216        match self.listeners.entry(token) {
1217            Entry::Vacant(entry) => {
1218                let tcp_listener =
1219                    TcpListener::new(config, token).map_err(ProxyError::AddListener)?;
1220                entry.insert(Rc::new(RefCell::new(tcp_listener)));
1221                Ok(token)
1222            }
1223            _ => Err(ProxyError::ListenerAlreadyPresent),
1224        }
1225    }
1226
1227    pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
1228        let len = self.listeners.len();
1229
1230        self.listeners.retain(|_, l| l.borrow().address != address);
1231        self.listeners.len() < len
1232    }
1233
1234    pub fn activate_listener(
1235        &self,
1236        addr: &SocketAddr,
1237        tcp_listener: Option<MioTcpListener>,
1238    ) -> Result<Token, ProxyError> {
1239        let listener = self
1240            .listeners
1241            .values()
1242            .find(|listener| listener.borrow().address == *addr)
1243            .ok_or(ProxyError::NoListenerFound(*addr))?;
1244
1245        listener.borrow_mut().activate(&self.registry, tcp_listener)
1246    }
1247
1248    pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1249        self.listeners
1250            .values()
1251            .filter_map(|listener| {
1252                let mut owned = listener.borrow_mut();
1253                if let Some(listener) = owned.listener.take() {
1254                    return Some((owned.address, listener));
1255                }
1256
1257                None
1258            })
1259            .collect()
1260    }
1261
1262    pub fn give_back_listener(
1263        &mut self,
1264        address: SocketAddr,
1265    ) -> Result<(Token, MioTcpListener), ProxyError> {
1266        let listener = self
1267            .listeners
1268            .values()
1269            .find(|listener| listener.borrow().address == address)
1270            .ok_or(ProxyError::NoListenerFound(address))?;
1271
1272        let mut owned = listener.borrow_mut();
1273
1274        let taken_listener = owned
1275            .listener
1276            .take()
1277            .ok_or(ProxyError::UnactivatedListener)?;
1278
1279        Ok((owned.token, taken_listener))
1280    }
1281
1282    pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1283        let address = front.address.into();
1284
1285        let mut listener = self
1286            .listeners
1287            .values()
1288            .find(|l| l.borrow().address == address)
1289            .ok_or(ProxyError::NoListenerFound(address))?
1290            .borrow_mut();
1291
1292        self.fronts
1293            .insert(front.cluster_id.to_string(), listener.token);
1294        listener.set_tags(address.to_string(), Some(front.tags));
1295        listener.cluster_id = Some(front.cluster_id);
1296        Ok(())
1297    }
1298
1299    pub fn remove_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1300        let address = front.address.into();
1301
1302        let mut listener = match self
1303            .listeners
1304            .values()
1305            .find(|l| l.borrow().address == address)
1306        {
1307            Some(l) => l.borrow_mut(),
1308            None => return Err(ProxyError::NoListenerFound(address)),
1309        };
1310
1311        listener.set_tags(address.to_string(), None);
1312        if let Some(cluster_id) = listener.cluster_id.take() {
1313            self.fronts.remove(&cluster_id);
1314        }
1315        Ok(())
1316    }
1317}
1318
1319impl ProxyConfiguration for TcpProxy {
1320    fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
1321        let request_type = match message.content.request_type {
1322            Some(t) => t,
1323            None => return WorkerResponse::error(message.id, "Empty request"),
1324        };
1325        match request_type {
1326            RequestType::AddTcpFrontend(front) => {
1327                if let Err(err) = self.add_tcp_front(front) {
1328                    return WorkerResponse::error(message.id, err);
1329                }
1330
1331                WorkerResponse::ok(message.id)
1332            }
1333            RequestType::RemoveTcpFrontend(front) => {
1334                if let Err(err) = self.remove_tcp_front(front) {
1335                    return WorkerResponse::error(message.id, err);
1336                }
1337
1338                WorkerResponse::ok(message.id)
1339            }
1340            RequestType::SoftStop(_) => {
1341                info!("{} processing soft shutdown", message.id);
1342                let listeners: HashMap<_, _> = self.listeners.drain().collect();
1343                for (_, l) in listeners.iter() {
1344                    l.borrow_mut()
1345                        .listener
1346                        .take()
1347                        .map(|mut sock| self.registry.deregister(&mut sock));
1348                }
1349                WorkerResponse::processing(message.id)
1350            }
1351            RequestType::HardStop(_) => {
1352                info!("{} hard shutdown", message.id);
1353                let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1354                for (_, l) in listeners.drain() {
1355                    l.borrow_mut()
1356                        .listener
1357                        .take()
1358                        .map(|mut sock| self.registry.deregister(&mut sock));
1359                }
1360                WorkerResponse::ok(message.id)
1361            }
1362            RequestType::Status(_) => {
1363                info!("{} status", message.id);
1364                WorkerResponse::ok(message.id)
1365            }
1366            RequestType::AddCluster(cluster) => {
1367                let config = ClusterConfiguration {
1368                    proxy_protocol: cluster
1369                        .proxy_protocol
1370                        .and_then(|n| ProxyProtocolConfig::try_from(n).ok()),
1371                    //load_balancing: cluster.load_balancing,
1372                };
1373                self.configs.insert(cluster.cluster_id, config);
1374                WorkerResponse::ok(message.id)
1375            }
1376            RequestType::RemoveCluster(cluster_id) => {
1377                self.configs.remove(&cluster_id);
1378                WorkerResponse::ok(message.id)
1379            }
1380            RequestType::RemoveListener(remove) => {
1381                if !self.remove_listener(remove.address.into()) {
1382                    WorkerResponse::error(
1383                        message.id,
1384                        format!("no TCP listener to remove at address {:?}", remove.address),
1385                    )
1386                } else {
1387                    WorkerResponse::ok(message.id)
1388                }
1389            }
1390            command => {
1391                debug!(
1392                    "{} unsupported message for TCP proxy, ignoring {:?}",
1393                    message.id, command
1394                );
1395                WorkerResponse::error(message.id, "unsupported message")
1396            }
1397        }
1398    }
1399
1400    fn accept(&mut self, token: ListenToken) -> Result<MioTcpStream, AcceptError> {
1401        let internal_token = Token(token.0);
1402        if let Some(listener) = self.listeners.get(&internal_token) {
1403            if let Some(tcp_listener) = &listener.borrow().listener {
1404                tcp_listener
1405                    .accept()
1406                    .map(|(frontend_sock, _)| frontend_sock)
1407                    .map_err(|e| match e.kind() {
1408                        ErrorKind::WouldBlock => AcceptError::WouldBlock,
1409                        _ => {
1410                            error!("accept() IO error: {:?}", e);
1411                            AcceptError::IoError
1412                        }
1413                    })
1414            } else {
1415                Err(AcceptError::IoError)
1416            }
1417        } else {
1418            Err(AcceptError::IoError)
1419        }
1420    }
1421
1422    fn create_session(
1423        &mut self,
1424        mut frontend_sock: MioTcpStream,
1425        token: ListenToken,
1426        wait_time: Duration,
1427        proxy: Rc<RefCell<Self>>,
1428    ) -> Result<(), AcceptError> {
1429        let listener_token = Token(token.0);
1430
1431        let listener = self
1432            .listeners
1433            .get(&listener_token)
1434            .ok_or(AcceptError::IoError)?;
1435
1436        let owned = listener.borrow();
1437        let mut pool = self.pool.borrow_mut();
1438
1439        let (front_buffer, back_buffer) = match (pool.checkout(), pool.checkout()) {
1440            (Some(fb), Some(bb)) => (fb, bb),
1441            _ => {
1442                error!("could not get buffers from pool");
1443                error!(
1444                    "Buffer capacity has been reached, stopping to accept new connections for now"
1445                );
1446                gauge!("accept_queue.backpressure", 1);
1447                self.sessions.borrow_mut().can_accept = false;
1448
1449                return Err(AcceptError::BufferCapacityReached);
1450            }
1451        };
1452
1453        if owned.cluster_id.is_none() {
1454            error!(
1455                "listener at address {:?} has no linked cluster",
1456                owned.address
1457            );
1458            return Err(AcceptError::IoError);
1459        }
1460
1461        let proxy_protocol = self
1462            .configs
1463            .get(owned.cluster_id.as_ref().unwrap())
1464            .and_then(|c| c.proxy_protocol);
1465
1466        if let Err(e) = frontend_sock.set_nodelay(true) {
1467            error!(
1468                "error setting nodelay on front socket({:?}): {:?}",
1469                frontend_sock, e
1470            );
1471        }
1472
1473        let mut session_manager = self.sessions.borrow_mut();
1474        let entry = session_manager.slab.vacant_entry();
1475        let frontend_token = Token(entry.key());
1476
1477        if let Err(register_error) = self.registry.register(
1478            &mut frontend_sock,
1479            frontend_token,
1480            Interest::READABLE | Interest::WRITABLE,
1481        ) {
1482            error!(
1483                "error registering front socket({:?}): {:?}",
1484                frontend_sock, register_error
1485            );
1486            return Err(AcceptError::RegisterError);
1487        }
1488
1489        let session = TcpSession::new(
1490            back_buffer,
1491            None,
1492            owned.cluster_id.clone(),
1493            Duration::from_secs(owned.config.back_timeout as u64),
1494            Duration::from_secs(owned.config.connect_timeout as u64),
1495            Duration::from_secs(owned.config.front_timeout as u64),
1496            front_buffer,
1497            frontend_token,
1498            listener.clone(),
1499            proxy_protocol,
1500            proxy,
1501            frontend_sock,
1502            wait_time,
1503        );
1504        incr!("tcp.requests");
1505
1506        let session = Rc::new(RefCell::new(session));
1507        entry.insert(session);
1508
1509        Ok(())
1510    }
1511}
1512
1513pub mod testing {
1514    use crate::testing::*;
1515
1516    /// This is not directly used by Sōzu but is available for example and testing purposes
1517    pub fn start_tcp_worker(
1518        config: TcpListenerConfig,
1519        max_buffers: usize,
1520        buffer_size: usize,
1521        channel: ProxyChannel,
1522    ) -> anyhow::Result<()> {
1523        let address = config.address.into();
1524
1525        let ServerParts {
1526            event_loop,
1527            registry,
1528            sessions,
1529            pool,
1530            backends,
1531            client_scm_socket: _,
1532            server_scm_socket,
1533            server_config,
1534        } = prebuild_server(max_buffers, buffer_size, true)?;
1535
1536        let token = {
1537            let mut sessions = sessions.borrow_mut();
1538            let entry = sessions.slab.vacant_entry();
1539            let key = entry.key();
1540            let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1541                protocol: Protocol::TCPListen,
1542            })));
1543            Token(key)
1544        };
1545
1546        let mut proxy = TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1547        proxy
1548            .add_listener(config, token)
1549            .with_context(|| "Failed at creating adding the listener")?;
1550        proxy
1551            .activate_listener(&address, None)
1552            .with_context(|| "Failed at creating activating the listener")?;
1553
1554        let mut server = Server::new(
1555            event_loop,
1556            channel,
1557            server_scm_socket,
1558            sessions,
1559            pool,
1560            backends,
1561            None,
1562            None,
1563            Some(proxy),
1564            server_config,
1565            None,
1566            false,
1567        )
1568        .with_context(|| "Failed at creating server")?;
1569
1570        debug!("starting event loop");
1571        server.run();
1572        debug!("ending event loop");
1573        Ok(())
1574    }
1575}
1576
1577#[cfg(test)]
1578mod tests {
1579    use super::testing::start_tcp_worker;
1580    use crate::testing::*;
1581
1582    use sozu_command::proto::command::SocketAddress;
1583    use std::{
1584        io::{Read, Write},
1585        net::{Shutdown, TcpListener, TcpStream},
1586        str,
1587        sync::{
1588            atomic::{AtomicBool, Ordering},
1589            Arc, Barrier,
1590        },
1591        thread,
1592    };
1593
1594    use sozu_command::{
1595        channel::Channel,
1596        config::ListenerBuilder,
1597        proto::command::{
1598            request::RequestType, LoadBalancingParams, RequestTcpFrontend, WorkerRequest,
1599            WorkerResponse,
1600        },
1601    };
1602    static TEST_FINISHED: AtomicBool = AtomicBool::new(false);
1603
1604    /*
1605    #[test]
1606    #[cfg(target_pointer_width = "64")]
1607    fn size_test() {
1608      assert_size!(Pipe<mio::net::TcpStream>, 224);
1609      assert_size!(SendProxyProtocol<mio::net::TcpStream>, 144);
1610      assert_size!(RelayProxyProtocol<mio::net::TcpStream>, 152);
1611      assert_size!(ExpectProxyProtocol<mio::net::TcpStream>, 520);
1612      assert_size!(State, 528);
1613      // fails depending on the platform?
1614      //assert_size!(Session, 808);
1615    }*/
1616
1617    #[test]
1618    fn round_trip() {
1619        setup_test_logger!();
1620        let barrier = Arc::new(Barrier::new(2));
1621        start_server(barrier.clone());
1622        let _tx = start_proxy().expect("Could not start proxy");
1623        barrier.wait();
1624
1625        let mut s1 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1626        let s3 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1627        let mut s2 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1628
1629        s1.write(&b"hello "[..])
1630            .map_err(|e| {
1631                TEST_FINISHED.store(true, Ordering::Relaxed);
1632                e
1633            })
1634            .unwrap();
1635        println!("s1 sent");
1636
1637        s2.write(&b"pouet pouet"[..])
1638            .map_err(|e| {
1639                TEST_FINISHED.store(true, Ordering::Relaxed);
1640                e
1641            })
1642            .unwrap();
1643
1644        println!("s2 sent");
1645
1646        let mut res = [0; 128];
1647        s1.write(&b"coucou"[..])
1648            .map_err(|e| {
1649                TEST_FINISHED.store(true, Ordering::Relaxed);
1650                e
1651            })
1652            .unwrap();
1653
1654        s3.shutdown(Shutdown::Both).unwrap();
1655        let sz2 = s2
1656            .read(&mut res[..])
1657            .map_err(|e| {
1658                TEST_FINISHED.store(true, Ordering::Relaxed);
1659                e
1660            })
1661            .expect("could not read from socket");
1662        println!("s2 received {:?}", str::from_utf8(&res[..sz2]));
1663        assert_eq!(&res[..sz2], &b"pouet pouet"[..]);
1664
1665        let sz1 = s1
1666            .read(&mut res[..])
1667            .map_err(|e| {
1668                TEST_FINISHED.store(true, Ordering::Relaxed);
1669                e
1670            })
1671            .expect("could not read from socket");
1672        println!(
1673            "s1 received again({}): {:?}",
1674            sz1,
1675            str::from_utf8(&res[..sz1])
1676        );
1677        assert_eq!(&res[..sz1], &b"hello coucou"[..]);
1678        TEST_FINISHED.store(true, Ordering::Relaxed);
1679    }
1680
1681    fn start_server(barrier: Arc<Barrier>) {
1682        let listener = TcpListener::bind("127.0.0.1:5678").expect("could not bind");
1683        fn handle_client(stream: &mut TcpStream, id: u8) {
1684            let mut buf = [0; 128];
1685            let _response = b" END";
1686            while let Ok(sz) = stream.read(&mut buf[..]) {
1687                if sz > 0 {
1688                    println!("ECHO[{}] got \"{:?}\"", id, str::from_utf8(&buf[..sz]));
1689                    stream.write(&buf[..sz]).unwrap();
1690                }
1691                if TEST_FINISHED.load(Ordering::Relaxed) {
1692                    println!("backend server stopping");
1693                    break;
1694                }
1695            }
1696        }
1697
1698        let mut count = 0;
1699        thread::spawn(move || {
1700            barrier.wait();
1701            for conn in listener.incoming() {
1702                match conn {
1703                    Ok(mut stream) => {
1704                        thread::spawn(move || {
1705                            println!("got a new client: {count}");
1706                            handle_client(&mut stream, count)
1707                        });
1708                    }
1709                    Err(e) => {
1710                        println!("connection failed: {e:?}");
1711                    }
1712                }
1713                count += 1;
1714            }
1715        });
1716    }
1717
1718    /// used in tests only
1719    pub fn start_proxy() -> anyhow::Result<Channel<WorkerRequest, WorkerResponse>> {
1720        let config = ListenerBuilder::new_tcp(SocketAddress::new_v4(127, 0, 0, 1, 1234))
1721            .to_tcp(None)
1722            .expect("could not create listener config");
1723
1724        let (mut command, channel) =
1725            Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
1726        let _jg = thread::spawn(move || {
1727            setup_test_logger!();
1728            start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server");
1729        });
1730
1731        command.blocking().unwrap();
1732        {
1733            let front = RequestTcpFrontend {
1734                cluster_id: String::from("yolo"),
1735                address: SocketAddress::new_v4(127, 0, 0, 1, 1234),
1736                ..Default::default()
1737            };
1738            let backend = sozu_command_lib::response::Backend {
1739                cluster_id: String::from("yolo"),
1740                backend_id: String::from("yolo-0"),
1741                address: SocketAddress::new_v4(127, 0, 0, 1, 5678).into(),
1742                load_balancing_parameters: Some(LoadBalancingParams::default()),
1743                sticky_id: None,
1744                backup: None,
1745            };
1746
1747            command
1748                .write_message(&WorkerRequest {
1749                    id: String::from("ID_YOLO1"),
1750                    content: RequestType::AddTcpFrontend(front).into(),
1751                })
1752                .unwrap();
1753            command
1754                .write_message(&WorkerRequest {
1755                    id: String::from("ID_YOLO2"),
1756                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
1757                })
1758                .unwrap();
1759        }
1760        {
1761            let front = RequestTcpFrontend {
1762                cluster_id: String::from("yolo"),
1763                address: SocketAddress::new_v4(127, 0, 0, 1, 1235),
1764                ..Default::default()
1765            };
1766            let backend = sozu_command::response::Backend {
1767                cluster_id: String::from("yolo"),
1768                backend_id: String::from("yolo-0"),
1769                address: SocketAddress::new_v4(127, 0, 0, 1, 5678).into(),
1770                load_balancing_parameters: Some(LoadBalancingParams::default()),
1771                sticky_id: None,
1772                backup: None,
1773            };
1774            command
1775                .write_message(&WorkerRequest {
1776                    id: String::from("ID_YOLO3"),
1777                    content: RequestType::AddTcpFrontend(front).into(),
1778                })
1779                .unwrap();
1780            command
1781                .write_message(&WorkerRequest {
1782                    id: String::from("ID_YOLO4"),
1783                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
1784                })
1785                .unwrap();
1786        }
1787
1788        // not sure why four times
1789        for _ in 0..4 {
1790            println!(
1791                "read_message: {:?}",
1792                command
1793                    .read_message()
1794                    .with_context(|| "could not read message")?
1795            );
1796        }
1797
1798        Ok(command)
1799    }
1800}