sozu_lib/
tcp.rs

1use std::{
2    cell::RefCell,
3    collections::{BTreeMap, HashMap, hash_map::Entry},
4    io::ErrorKind,
5    net::{Shutdown, SocketAddr},
6    os::unix::io::AsRawFd,
7    rc::Rc,
8    time::{Duration, Instant},
9};
10
11use mio::{
12    Interest, Registry, Token,
13    net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream},
14    unix::SourceFd,
15};
16use rusty_ulid::Ulid;
17use sozu_command::{
18    ObjectKind,
19    config::MAX_LOOP_ITERATIONS,
20    logging::{EndpointRecord, LogContext},
21    proto::command::request::RequestType,
22};
23
24use crate::{
25    AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
26    ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
27    Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
28    backends::{Backend, BackendMap},
29    pool::{Checkout, Pool},
30    protocol::{
31        Pipe,
32        pipe::WebSocketContext,
33        proxy_protocol::{
34            expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
35        },
36    },
37    retry::RetryPolicy,
38    server::{CONN_RETRIES, ListenToken, SessionManager, push_event},
39    socket::{server_bind, stats::socket_rtt},
40    sozu_command::{
41        proto::command::{
42            Event, EventKind, ProxyProtocolConfig, RequestTcpFrontend, TcpListenerConfig,
43            WorkerRequest, WorkerResponse,
44        },
45        ready::Ready,
46        state::ClusterId,
47    },
48    timer::TimeoutContainer,
49};
50
51StateMachineBuilder! {
52    /// The various Stages of a TCP connection:
53    ///
54    /// 1. optional (ExpectProxyProtocol | SendProxyProtocol | RelayProxyProtocol)
55    /// 2. Pipe
56    enum TcpStateMachine {
57        Pipe(Pipe<MioTcpStream, TcpListener>),
58        SendProxyProtocol(SendProxyProtocol<MioTcpStream>),
59        RelayProxyProtocol(RelayProxyProtocol<MioTcpStream>),
60        ExpectProxyProtocol(ExpectProxyProtocol<MioTcpStream>),
61    }
62}
63
64/// This macro is defined uniquely in this module to help the tracking of kawa h1
65/// issues inside Sōzu
66macro_rules! log_context {
67    ($self:expr) => {
68        format!(
69            "TCP\t{}\tSession(frontend={}, backend={})\t >>>",
70            $self.log_context(),
71            $self.frontend_token.0,
72            $self
73                .backend_token
74                .map(|token| token.0.to_string())
75                .unwrap_or_else(|| "<none>".to_string()),
76        )
77    };
78}
79
80pub struct TcpSession {
81    backend_buffer: Option<Checkout>,
82    backend_connected: BackendConnectionStatus,
83    backend_id: Option<String>,
84    backend_token: Option<Token>,
85    backend: Option<Rc<RefCell<Backend>>>,
86    cluster_id: Option<String>,
87    configured_backend_timeout: Duration,
88    connection_attempt: u8,
89    container_backend_timeout: TimeoutContainer,
90    container_frontend_timeout: TimeoutContainer,
91    frontend_address: Option<SocketAddr>,
92    frontend_buffer: Option<Checkout>,
93    frontend_token: Token,
94    has_been_closed: SessionIsToBeClosed,
95    last_event: Instant,
96    listener: Rc<RefCell<TcpListener>>,
97    metrics: SessionMetrics,
98    proxy: Rc<RefCell<TcpProxy>>,
99    request_id: Ulid,
100    state: TcpStateMachine,
101}
102
103impl TcpSession {
104    #[allow(clippy::too_many_arguments)]
105    fn new(
106        backend_buffer: Checkout,
107        backend_id: Option<String>,
108        cluster_id: Option<String>,
109        configured_backend_timeout: Duration,
110        configured_connect_timeout: Duration,
111        configured_frontend_timeout: Duration,
112        frontend_buffer: Checkout,
113        frontend_token: Token,
114        listener: Rc<RefCell<TcpListener>>,
115        proxy_protocol: Option<ProxyProtocolConfig>,
116        proxy: Rc<RefCell<TcpProxy>>,
117        socket: MioTcpStream,
118        wait_time: Duration,
119    ) -> TcpSession {
120        let frontend_address = socket.peer_addr().ok();
121        let mut frontend_buffer_session = None;
122        let mut backend_buffer_session = None;
123
124        let request_id = Ulid::generate();
125
126        let container_frontend_timeout =
127            TimeoutContainer::new(configured_frontend_timeout, frontend_token);
128        let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout);
129
130        let state = match proxy_protocol {
131            Some(ProxyProtocolConfig::RelayHeader) => {
132                backend_buffer_session = Some(backend_buffer);
133                gauge_add!("protocol.proxy.relay", 1);
134                TcpStateMachine::RelayProxyProtocol(RelayProxyProtocol::new(
135                    socket,
136                    frontend_token,
137                    request_id,
138                    None,
139                    frontend_buffer,
140                ))
141            }
142            Some(ProxyProtocolConfig::ExpectHeader) => {
143                frontend_buffer_session = Some(frontend_buffer);
144                backend_buffer_session = Some(backend_buffer);
145                gauge_add!("protocol.proxy.expect", 1);
146                TcpStateMachine::ExpectProxyProtocol(ExpectProxyProtocol::new(
147                    container_frontend_timeout.clone(),
148                    socket,
149                    frontend_token,
150                    request_id,
151                ))
152            }
153            Some(ProxyProtocolConfig::SendHeader) => {
154                frontend_buffer_session = Some(frontend_buffer);
155                backend_buffer_session = Some(backend_buffer);
156                gauge_add!("protocol.proxy.send", 1);
157                TcpStateMachine::SendProxyProtocol(SendProxyProtocol::new(
158                    socket,
159                    frontend_token,
160                    request_id,
161                    None,
162                ))
163            }
164            None => {
165                gauge_add!("protocol.tcp", 1);
166                let mut pipe = Pipe::new(
167                    backend_buffer,
168                    backend_id.clone(),
169                    None,
170                    None,
171                    None,
172                    None,
173                    cluster_id.clone(),
174                    frontend_buffer,
175                    frontend_token,
176                    socket,
177                    listener.clone(),
178                    Protocol::TCP,
179                    request_id,
180                    frontend_address,
181                    WebSocketContext::Tcp,
182                );
183                pipe.set_cluster_id(cluster_id.clone());
184                TcpStateMachine::Pipe(pipe)
185            }
186        };
187
188        let metrics = SessionMetrics::new(Some(wait_time));
189        //FIXME: timeout usage
190
191        TcpSession {
192            backend_buffer: backend_buffer_session,
193            backend_connected: BackendConnectionStatus::NotConnected,
194            backend_id,
195            backend_token: None,
196            backend: None,
197            cluster_id,
198            configured_backend_timeout,
199            connection_attempt: 0,
200            container_backend_timeout,
201            container_frontend_timeout,
202            frontend_address,
203            frontend_buffer: frontend_buffer_session,
204            frontend_token,
205            has_been_closed: false,
206            last_event: Instant::now(),
207            listener,
208            metrics,
209            proxy,
210            request_id,
211            state,
212        }
213    }
214
215    fn log_request(&self) {
216        let listener = self.listener.borrow();
217        let context = self.log_context();
218        self.metrics.register_end_of_session(&context);
219        info_access!(
220            on_failure: { incr!("unsent-access-logs") },
221            message: None,
222            context,
223            session_address: self.frontend_address,
224            backend_address: None,
225            protocol: "TCP",
226            endpoint: EndpointRecord::Tcp,
227            tags: listener.get_tags(&listener.get_addr().to_string()),
228            client_rtt: socket_rtt(self.state.front_socket()),
229            server_rtt: None,
230            user_agent: None,
231            service_time: self.metrics.service_time(),
232            response_time: self.metrics.backend_response_time(),
233            request_time: self.metrics.request_time(),
234            bytes_in: self.metrics.bin,
235            bytes_out: self.metrics.bout,
236            otel: None,
237        );
238    }
239
240    fn front_hup(&mut self) -> SessionResult {
241        match &mut self.state {
242            TcpStateMachine::Pipe(pipe) => pipe.frontend_hup(&mut self.metrics),
243            _ => {
244                self.log_request();
245                SessionResult::Close
246            }
247        }
248    }
249
250    fn back_hup(&mut self) -> SessionResult {
251        match &mut self.state {
252            TcpStateMachine::Pipe(pipe) => pipe.backend_hup(&mut self.metrics),
253            _ => {
254                self.log_request();
255                SessionResult::Close
256            }
257        }
258    }
259
260    fn log_context(&self) -> LogContext<'_> {
261        LogContext {
262            request_id: self.request_id,
263            cluster_id: self.cluster_id.as_deref(),
264            backend_id: self.backend_id.as_deref(),
265        }
266    }
267
268    fn readable(&mut self) -> SessionResult {
269        if !self.container_frontend_timeout.reset() {
270            error!(
271                "{} Could not reset frontend timeout on readable",
272                log_context!(self)
273            );
274        }
275        if self.backend_connected == BackendConnectionStatus::Connected
276            && !self.container_backend_timeout.reset()
277        {
278            error!(
279                "{} Could not reset backend timeout on readable",
280                log_context!(self)
281            );
282        }
283        match &mut self.state {
284            TcpStateMachine::Pipe(pipe) => pipe.readable(&mut self.metrics),
285            TcpStateMachine::RelayProxyProtocol(pp) => pp.readable(&mut self.metrics),
286            TcpStateMachine::ExpectProxyProtocol(pp) => pp.readable(&mut self.metrics),
287            TcpStateMachine::SendProxyProtocol(_) => SessionResult::Continue,
288            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
289        }
290    }
291
292    fn writable(&mut self) -> SessionResult {
293        match &mut self.state {
294            TcpStateMachine::Pipe(pipe) => pipe.writable(&mut self.metrics),
295            _ => SessionResult::Continue,
296        }
297    }
298
299    fn back_readable(&mut self) -> SessionResult {
300        if !self.container_frontend_timeout.reset() {
301            error!(
302                "{} Could not reset frontend timeout on back_readable",
303                log_context!(self)
304            );
305        }
306        if !self.container_backend_timeout.reset() {
307            error!(
308                "{} Could not reset backend timeout on back_readable",
309                log_context!(self)
310            );
311        }
312
313        match &mut self.state {
314            TcpStateMachine::Pipe(pipe) => pipe.backend_readable(&mut self.metrics),
315            _ => SessionResult::Continue,
316        }
317    }
318
319    fn back_writable(&mut self) -> SessionResult {
320        match &mut self.state {
321            TcpStateMachine::Pipe(pipe) => pipe.backend_writable(&mut self.metrics),
322            TcpStateMachine::RelayProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
323            TcpStateMachine::SendProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
324            TcpStateMachine::ExpectProxyProtocol(_) => SessionResult::Continue,
325            TcpStateMachine::FailedUpgrade(_) => {
326                unreachable!()
327            }
328        }
329    }
330
331    fn back_socket_mut(&mut self) -> Option<&mut MioTcpStream> {
332        match &mut self.state {
333            TcpStateMachine::Pipe(pipe) => pipe.back_socket_mut(),
334            TcpStateMachine::SendProxyProtocol(pp) => pp.back_socket_mut(),
335            TcpStateMachine::RelayProxyProtocol(pp) => pp.back_socket_mut(),
336            TcpStateMachine::ExpectProxyProtocol(_) => None,
337            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
338        }
339    }
340
341    pub fn upgrade(&mut self) -> SessionIsToBeClosed {
342        let new_state = match self.state.take() {
343            TcpStateMachine::SendProxyProtocol(spp) => self.upgrade_send(spp),
344            TcpStateMachine::RelayProxyProtocol(rpp) => self.upgrade_relay(rpp),
345            TcpStateMachine::ExpectProxyProtocol(epp) => self.upgrade_expect(epp),
346            TcpStateMachine::Pipe(_) => None,
347            TcpStateMachine::FailedUpgrade(_) => todo!(),
348        };
349
350        match new_state {
351            Some(state) => {
352                self.state = state;
353                false
354            } // The state stays FailedUpgrade, but the Session should be closed right after
355
356            None => true,
357        }
358    }
359
360    fn upgrade_send(
361        &mut self,
362        send_proxy_protocol: SendProxyProtocol<MioTcpStream>,
363    ) -> Option<TcpStateMachine> {
364        if self.backend_buffer.is_some() && self.frontend_buffer.is_some() {
365            let mut pipe = send_proxy_protocol.into_pipe(
366                self.frontend_buffer.take().unwrap(),
367                self.backend_buffer.take().unwrap(),
368                self.listener.clone(),
369            );
370
371            pipe.set_cluster_id(self.cluster_id.clone());
372            gauge_add!("protocol.proxy.send", -1);
373            gauge_add!("protocol.tcp", 1);
374            return Some(TcpStateMachine::Pipe(pipe));
375        }
376
377        error!(
378            "{} Missing the frontend or backend buffer queue, we can't switch to a pipe",
379            log_context!(self)
380        );
381        None
382    }
383
384    fn upgrade_relay(&mut self, rpp: RelayProxyProtocol<MioTcpStream>) -> Option<TcpStateMachine> {
385        if self.backend_buffer.is_some() {
386            let mut pipe =
387                rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone());
388            pipe.set_cluster_id(self.cluster_id.clone());
389            gauge_add!("protocol.proxy.relay", -1);
390            gauge_add!("protocol.tcp", 1);
391            return Some(TcpStateMachine::Pipe(pipe));
392        }
393
394        error!(
395            "{} Missing the backend buffer queue, we can't switch to a pipe",
396            log_context!(self)
397        );
398        None
399    }
400
401    fn upgrade_expect(
402        &mut self,
403        epp: ExpectProxyProtocol<MioTcpStream>,
404    ) -> Option<TcpStateMachine> {
405        if self.frontend_buffer.is_some() && self.backend_buffer.is_some() {
406            let mut pipe = epp.into_pipe(
407                self.frontend_buffer.take().unwrap(),
408                self.backend_buffer.take().unwrap(),
409                None,
410                None,
411                self.listener.clone(),
412            );
413
414            pipe.set_cluster_id(self.cluster_id.clone());
415            gauge_add!("protocol.proxy.expect", -1);
416            gauge_add!("protocol.tcp", 1);
417            return Some(TcpStateMachine::Pipe(pipe));
418        }
419
420        error!(
421            "{} Missing the backend buffer queue, we can't switch to a pipe",
422            log_context!(self)
423        );
424        None
425    }
426
427    fn front_readiness(&mut self) -> &mut Readiness {
428        match &mut self.state {
429            TcpStateMachine::Pipe(pipe) => &mut pipe.frontend_readiness,
430            TcpStateMachine::SendProxyProtocol(pp) => &mut pp.frontend_readiness,
431            TcpStateMachine::RelayProxyProtocol(pp) => &mut pp.frontend_readiness,
432            TcpStateMachine::ExpectProxyProtocol(pp) => &mut pp.frontend_readiness,
433            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
434        }
435    }
436
437    fn back_readiness(&mut self) -> Option<&mut Readiness> {
438        match &mut self.state {
439            TcpStateMachine::Pipe(pipe) => Some(&mut pipe.backend_readiness),
440            TcpStateMachine::SendProxyProtocol(pp) => Some(&mut pp.backend_readiness),
441            TcpStateMachine::RelayProxyProtocol(pp) => Some(&mut pp.backend_readiness),
442            TcpStateMachine::ExpectProxyProtocol(_) => None,
443            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
444        }
445    }
446
447    fn set_back_socket(&mut self, socket: MioTcpStream) {
448        match &mut self.state {
449            TcpStateMachine::Pipe(pipe) => pipe.set_back_socket(socket),
450            TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_socket(socket),
451            TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_socket(socket),
452            TcpStateMachine::ExpectProxyProtocol(_) => {
453                error!(
454                    "{} We should not set the back socket for the expect proxy protocol",
455                    log_context!(self)
456                );
457                panic!(
458                    "{} We should not set the back socket for the expect proxy protocol",
459                    log_context!(self)
460                );
461            }
462            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
463        }
464    }
465
466    fn set_back_token(&mut self, token: Token) {
467        self.backend_token = Some(token);
468
469        match &mut self.state {
470            TcpStateMachine::Pipe(pipe) => pipe.set_back_token(token),
471            TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_token(token),
472            TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_token(token),
473            TcpStateMachine::ExpectProxyProtocol(_) => self.backend_token = Some(token),
474            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
475        }
476    }
477
478    fn set_backend_id(&mut self, id: String) {
479        self.backend_id = Some(id.clone());
480        if let TcpStateMachine::Pipe(pipe) = &mut self.state {
481            pipe.set_backend_id(Some(id));
482        }
483    }
484
485    fn back_connected(&self) -> BackendConnectionStatus {
486        self.backend_connected
487    }
488
489    fn set_back_connected(&mut self, status: BackendConnectionStatus) {
490        let last = self.backend_connected;
491        self.backend_connected = status;
492
493        if status == BackendConnectionStatus::Connected {
494            gauge_add!("backend.connections", 1);
495            gauge_add!(
496                "connections_per_backend",
497                1,
498                self.cluster_id.as_deref(),
499                self.metrics.backend_id.as_deref()
500            );
501
502            // the back timeout was of connect_timeout duration before,
503            // now that we're connected, move to backend_timeout duration
504            self.container_backend_timeout
505                .set_duration(self.configured_backend_timeout);
506            self.container_frontend_timeout.reset();
507
508            if let TcpStateMachine::SendProxyProtocol(spp) = &mut self.state {
509                spp.set_back_connected(BackendConnectionStatus::Connected);
510            }
511
512            if let Some(backend) = self.backend.as_ref() {
513                let mut backend = backend.borrow_mut();
514
515                if backend.retry_policy.is_down() {
516                    incr!(
517                        "backend.up",
518                        self.cluster_id.as_deref(),
519                        self.metrics.backend_id.as_deref()
520                    );
521                    info!(
522                        "backend server {} at {} is up",
523                        backend.backend_id, backend.address
524                    );
525                    push_event(Event {
526                        kind: EventKind::BackendUp as i32,
527                        backend_id: Some(backend.backend_id.to_owned()),
528                        address: Some(backend.address.into()),
529                        cluster_id: None,
530                    });
531                }
532
533                if let BackendConnectionStatus::Connecting(start) = last {
534                    backend.set_connection_time(Instant::now() - start);
535                }
536
537                //successful connection, rest failure counter
538                backend.failures = 0;
539                backend.retry_policy.succeed();
540            }
541        }
542    }
543
544    fn remove_backend(&mut self) {
545        if let Some(backend) = self.backend.take() {
546            (*backend.borrow_mut()).dec_connections();
547        }
548
549        self.backend_token = None;
550    }
551
552    fn fail_backend_connection(&mut self) {
553        if let Some(backend) = self.backend.as_ref() {
554            let backend = &mut *backend.borrow_mut();
555            backend.failures += 1;
556
557            let already_unavailable = backend.retry_policy.is_down();
558            backend.retry_policy.fail();
559            incr!(
560                "backend.connections.error",
561                self.cluster_id.as_deref(),
562                self.metrics.backend_id.as_deref()
563            );
564            if !already_unavailable && backend.retry_policy.is_down() {
565                error!(
566                    "backend server {} at {} is down",
567                    backend.backend_id, backend.address
568                );
569                incr!(
570                    "backend.down",
571                    self.cluster_id.as_deref(),
572                    self.metrics.backend_id.as_deref()
573                );
574
575                push_event(Event {
576                    kind: EventKind::BackendDown as i32,
577                    backend_id: Some(backend.backend_id.to_owned()),
578                    address: Some(backend.address.into()),
579                    cluster_id: None,
580                });
581            }
582        }
583    }
584
585    pub fn test_back_socket(&mut self) -> SessionIsToBeClosed {
586        match self.back_socket_mut() {
587            Some(ref mut s) => {
588                let mut tmp = [0u8; 1];
589                let res = s.peek(&mut tmp[..]);
590
591                match res {
592                    // if the socket is half open, it will report 0 bytes read (EOF)
593                    Ok(0) => false,
594                    Ok(_) => true,
595                    Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
596                }
597            }
598            None => false,
599        }
600    }
601
602    pub fn cancel_timeouts(&mut self) {
603        self.container_frontend_timeout.cancel();
604        self.container_backend_timeout.cancel();
605    }
606
607    fn ready_inner(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionResult {
608        let mut counter = 0;
609
610        let back_connected = self.back_connected();
611        if back_connected.is_connecting() {
612            if self.back_readiness().unwrap().event.is_hup() && !self.test_back_socket() {
613                //retry connecting the backend
614                debug!("error connecting to backend, trying again");
615                self.connection_attempt += 1;
616                self.fail_backend_connection();
617
618                // trigger a backend reconnection
619                self.close_backend();
620                let connection_result = self.connect_to_backend(session.clone());
621                if let Err(err) = &connection_result {
622                    error!(
623                        "{} Error connecting to backend: {}",
624                        log_context!(self),
625                        err
626                    );
627                }
628
629                if let Some(state_result) = handle_connection_result(connection_result) {
630                    return state_result;
631                }
632            } else if self.back_readiness().unwrap().event != Ready::EMPTY {
633                self.connection_attempt = 0;
634                self.set_back_connected(BackendConnectionStatus::Connected);
635            }
636        } else if back_connected == BackendConnectionStatus::NotConnected {
637            let connection_result = self.connect_to_backend(session.clone());
638            if let Err(err) = &connection_result {
639                error!(
640                    "{} Error connecting to backend: {}",
641                    log_context!(self),
642                    err
643                );
644            }
645
646            if let Some(state_result) = handle_connection_result(connection_result) {
647                return state_result;
648            }
649        }
650
651        if self.front_readiness().event.is_hup() {
652            let session_result = self.front_hup();
653            if session_result == SessionResult::Continue {
654                self.front_readiness().event.remove(Ready::HUP);
655            }
656            return session_result;
657        }
658
659        while counter < MAX_LOOP_ITERATIONS {
660            let front_interest = self.front_readiness().interest & self.front_readiness().event;
661            let back_interest = self
662                .back_readiness()
663                .map(|r| r.interest & r.event)
664                .unwrap_or(Ready::EMPTY);
665
666            trace!(
667                "{} Frontend interest({:?}) and backend interest({:?})",
668                log_context!(self),
669                front_interest,
670                back_interest
671            );
672
673            if front_interest == Ready::EMPTY && back_interest == Ready::EMPTY {
674                break;
675            }
676
677            if self
678                .back_readiness()
679                .map(|r| r.event.is_hup())
680                .unwrap_or(false)
681                && self.front_readiness().interest.is_writable()
682                && !self.front_readiness().event.is_writable()
683            {
684                break;
685            }
686
687            if front_interest.is_readable() {
688                let session_result = self.readable();
689                if session_result != SessionResult::Continue {
690                    return session_result;
691                }
692            }
693
694            if back_interest.is_writable() {
695                let session_result = self.back_writable();
696                if session_result != SessionResult::Continue {
697                    return session_result;
698                }
699            }
700
701            if back_interest.is_readable() {
702                let session_result = self.back_readable();
703                if session_result != SessionResult::Continue {
704                    return session_result;
705                }
706            }
707
708            if front_interest.is_writable() {
709                let session_result = self.writable();
710                if session_result != SessionResult::Continue {
711                    return session_result;
712                }
713            }
714
715            if back_interest.is_hup() {
716                let session_result = self.back_hup();
717                if session_result != SessionResult::Continue {
718                    return session_result;
719                }
720            }
721
722            if front_interest.is_error() {
723                error!(
724                    "{} Frontend socket error, disconnecting",
725                    log_context!(self)
726                );
727                self.front_readiness().interest = Ready::EMPTY;
728                if let Some(r) = self.back_readiness() {
729                    r.interest = Ready::EMPTY;
730                }
731
732                return SessionResult::Close;
733            }
734
735            if back_interest.is_error() && self.back_hup() == SessionResult::Close {
736                self.front_readiness().interest = Ready::EMPTY;
737                if let Some(r) = self.back_readiness() {
738                    r.interest = Ready::EMPTY;
739                }
740
741                error!("{} backend socket error, disconnecting", log_context!(self));
742                return SessionResult::Close;
743            }
744
745            counter += 1;
746        }
747
748        if counter >= MAX_LOOP_ITERATIONS {
749            error!(
750                "{} Handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
751                log_context!(self),
752                MAX_LOOP_ITERATIONS
753            );
754
755            incr!("tcp.infinite_loop.error");
756
757            let front_interest = self.front_readiness().interest & self.front_readiness().event;
758            let back_interest = self
759                .back_readiness()
760                .map(|r| r.interest & r.event)
761                .unwrap_or(Ready::EMPTY);
762
763            let back = self.back_readiness().cloned();
764
765            error!(
766                "{} readiness: front {:?} / back {:?} | front: {:?} | back: {:?} ",
767                log_context!(self),
768                self.front_readiness(),
769                back,
770                front_interest,
771                back_interest
772            );
773
774            self.print_session();
775
776            return SessionResult::Close;
777        }
778
779        SessionResult::Continue
780    }
781
782    /// TCP session closes its backend on its own, without defering this task to the state
783    fn close_backend(&mut self) {
784        if let (Some(token), Some(fd)) = (
785            self.backend_token,
786            self.back_socket_mut().map(|s| s.as_raw_fd()),
787        ) {
788            let proxy = self.proxy.borrow();
789            if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
790                error!(
791                    "{} Error deregistering socket({:?}): {:?}",
792                    log_context!(self),
793                    fd,
794                    e
795                );
796            }
797
798            proxy.sessions.borrow_mut().slab.try_remove(token.0);
799        }
800        self.remove_backend();
801
802        let back_connected = self.back_connected();
803        if back_connected != BackendConnectionStatus::NotConnected {
804            if let Some(r) = self.back_readiness() {
805                r.event = Ready::EMPTY;
806            }
807
808            let log_context = log_context!(self);
809            if let Some(sock) = self.back_socket_mut() {
810                if let Err(e) = sock.shutdown(Shutdown::Both) {
811                    if e.kind() != ErrorKind::NotConnected {
812                        error!(
813                            "{} Error closing back socket({:?}): {:?}",
814                            log_context, sock, e
815                        );
816                    }
817                }
818            }
819        }
820
821        if back_connected == BackendConnectionStatus::Connected {
822            gauge_add!("backend.connections", -1);
823            gauge_add!(
824                "connections_per_backend",
825                -1,
826                self.cluster_id.as_deref(),
827                self.metrics.backend_id.as_deref()
828            );
829        }
830
831        self.set_back_connected(BackendConnectionStatus::NotConnected);
832    }
833
834    fn connect_to_backend(
835        &mut self,
836        session_rc: Rc<RefCell<dyn ProxySession>>,
837    ) -> Result<BackendConnectAction, BackendConnectionError> {
838        let cluster_id = self
839            .listener
840            .borrow()
841            .cluster_id
842            .clone()
843            .ok_or(BackendConnectionError::NotFound(ObjectKind::TcpCluster))?;
844
845        self.cluster_id = Some(cluster_id.clone());
846
847        if self.connection_attempt >= CONN_RETRIES {
848            error!(
849                "{} Max connection attempt reached ({})",
850                log_context!(self),
851                self.connection_attempt
852            );
853            return Err(BackendConnectionError::MaxConnectionRetries(Some(
854                cluster_id,
855            )));
856        }
857
858        if self.proxy.borrow().sessions.borrow().at_capacity() {
859            return Err(BackendConnectionError::MaxSessionsMemory);
860        }
861
862        let (backend, mut stream) = self
863            .proxy
864            .borrow()
865            .backends
866            .borrow_mut()
867            .backend_from_cluster_id(&cluster_id)
868            .map_err(BackendConnectionError::Backend)?;
869
870        if let Err(e) = stream.set_nodelay(true) {
871            error!(
872                "{} Error setting nodelay on back socket({:?}): {:?}",
873                log_context!(self),
874                stream,
875                e
876            );
877        }
878        self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());
879
880        let back_token = {
881            let proxy = self.proxy.borrow();
882            let mut s = proxy.sessions.borrow_mut();
883            let entry = s.slab.vacant_entry();
884            let back_token = Token(entry.key());
885            let _entry = entry.insert(session_rc.clone());
886            back_token
887        };
888
889        if let Err(e) = self.proxy.borrow().registry.register(
890            &mut stream,
891            back_token,
892            Interest::READABLE | Interest::WRITABLE,
893        ) {
894            error!(
895                "{} Error registering back socket({:?}): {:?}",
896                log_context!(self),
897                stream,
898                e
899            );
900        }
901
902        self.container_backend_timeout.set(back_token);
903
904        self.set_back_token(back_token);
905        self.set_back_socket(stream);
906
907        self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
908        self.metrics.backend_start();
909        self.set_backend_id(backend.borrow().backend_id.clone());
910
911        Ok(BackendConnectAction::New)
912    }
913}
914
915impl ProxySession for TcpSession {
916    fn close(&mut self) {
917        if self.has_been_closed {
918            return;
919        }
920
921        // TODO: the state should handle the timeouts
922        trace!("{} Closing TCP session", log_context!(self));
923        self.metrics.service_stop();
924
925        // Restore gauges
926        match self.state.marker() {
927            StateMarker::Pipe => gauge_add!("protocol.tcp", -1),
928            StateMarker::SendProxyProtocol => gauge_add!("protocol.proxy.send", -1),
929            StateMarker::RelayProxyProtocol => gauge_add!("protocol.proxy.relay", -1),
930            StateMarker::ExpectProxyProtocol => gauge_add!("protocol.proxy.expect", -1),
931        }
932
933        if self.state.failed() {
934            match self.state.marker() {
935                StateMarker::Pipe => incr!("tcp.upgrade.pipe.failed"),
936                StateMarker::SendProxyProtocol => incr!("tcp.upgrade.send.failed"),
937                StateMarker::RelayProxyProtocol => incr!("tcp.upgrade.relay.failed"),
938                StateMarker::ExpectProxyProtocol => incr!("tcp.upgrade.expect.failed"),
939            }
940            return;
941        }
942
943        self.cancel_timeouts();
944
945        let front_socket = self.state.front_socket();
946        if let Err(e) = front_socket.shutdown(Shutdown::Both) {
947            // error 107 NotConnected can happen when was never fully connected, or was already disconnected due to error
948            if e.kind() != ErrorKind::NotConnected {
949                error!(
950                    "{} Error shutting down front socket({:?}): {:?}",
951                    log_context!(self),
952                    front_socket,
953                    e
954                );
955            }
956        }
957
958        // deregister the frontend and remove it, in a separate scope to drop proxy when done
959        {
960            let proxy = self.proxy.borrow();
961            let fd = front_socket.as_raw_fd();
962            if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
963                error!(
964                    "{} Error deregistering front socket({:?}) while closing TCP session: {:?}",
965                    log_context!(self),
966                    fd,
967                    e
968                );
969            }
970            proxy
971                .sessions
972                .borrow_mut()
973                .slab
974                .try_remove(self.frontend_token.0);
975        }
976
977        self.close_backend();
978        self.has_been_closed = true;
979    }
980
981    fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
982        if self.frontend_token == token {
983            self.container_frontend_timeout.triggered();
984            return true;
985        }
986        if self.backend_token == Some(token) {
987            self.container_backend_timeout.triggered();
988            return true;
989        }
990        // invalid token, obsolete timeout triggered
991        false
992    }
993
994    fn protocol(&self) -> Protocol {
995        Protocol::TCP
996    }
997
998    fn update_readiness(&mut self, token: Token, events: Ready) {
999        trace!(
1000            "{} token {:?} got event {}",
1001            log_context!(self),
1002            token,
1003            super::ready_to_string(events)
1004        );
1005
1006        self.last_event = Instant::now();
1007        self.metrics.wait_start();
1008
1009        if self.frontend_token == token {
1010            self.front_readiness().event = self.front_readiness().event | events;
1011        } else if self.backend_token == Some(token) {
1012            if let Some(r) = self.back_readiness() {
1013                r.event |= events;
1014            }
1015        }
1016    }
1017
1018    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1019        self.metrics.service_start();
1020
1021        let session_result = self.ready_inner(session.clone());
1022
1023        let to_bo_closed = match session_result {
1024            SessionResult::Close => true,
1025            SessionResult::Continue => false,
1026            SessionResult::Upgrade => match self.upgrade() {
1027                false => self.ready(session),
1028                true => true,
1029            },
1030        };
1031
1032        self.metrics.service_stop();
1033        to_bo_closed
1034    }
1035
1036    fn shutting_down(&mut self) -> SessionIsToBeClosed {
1037        true
1038    }
1039
1040    fn last_event(&self) -> Instant {
1041        self.last_event
1042    }
1043
1044    fn print_session(&self) {
1045        let state: String = match &self.state {
1046            TcpStateMachine::ExpectProxyProtocol(_) => String::from("Expect"),
1047            TcpStateMachine::SendProxyProtocol(_) => String::from("Send"),
1048            TcpStateMachine::RelayProxyProtocol(_) => String::from("Relay"),
1049            TcpStateMachine::Pipe(_) => String::from("TCP"),
1050            TcpStateMachine::FailedUpgrade(marker) => format!("FailedUpgrade({marker:?})"),
1051        };
1052
1053        let front_readiness = match &self.state {
1054            TcpStateMachine::ExpectProxyProtocol(expect) => Some(&expect.frontend_readiness),
1055            TcpStateMachine::SendProxyProtocol(send) => Some(&send.frontend_readiness),
1056            TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.frontend_readiness),
1057            TcpStateMachine::Pipe(pipe) => Some(&pipe.frontend_readiness),
1058            TcpStateMachine::FailedUpgrade(_) => None,
1059        };
1060
1061        let back_readiness = match &self.state {
1062            TcpStateMachine::SendProxyProtocol(send) => Some(&send.backend_readiness),
1063            TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.backend_readiness),
1064            TcpStateMachine::Pipe(pipe) => Some(&pipe.backend_readiness),
1065            TcpStateMachine::ExpectProxyProtocol(_) => None,
1066            TcpStateMachine::FailedUpgrade(_) => None,
1067        };
1068
1069        error!(
1070            "\
1071{} Session ({:?})
1072\tFrontend:
1073\t\ttoken: {:?}\treadiness: {:?}
1074\tBackend:
1075\t\ttoken: {:?}\treadiness: {:?}\tstatus: {:?}\tcluster id: {:?}",
1076            log_context!(self),
1077            state,
1078            self.frontend_token,
1079            front_readiness,
1080            self.backend_token,
1081            back_readiness,
1082            self.backend_connected,
1083            self.cluster_id
1084        );
1085        error!("Metrics: {:?}", self.metrics);
1086    }
1087
1088    fn frontend_token(&self) -> Token {
1089        self.frontend_token
1090    }
1091}
1092
1093pub struct TcpListener {
1094    active: SessionIsToBeClosed,
1095    address: SocketAddr,
1096    cluster_id: Option<String>,
1097    config: TcpListenerConfig,
1098    listener: Option<MioTcpListener>,
1099    tags: BTreeMap<String, CachedTags>,
1100    token: Token,
1101}
1102
1103impl ListenerHandler for TcpListener {
1104    fn get_addr(&self) -> &SocketAddr {
1105        &self.address
1106    }
1107
1108    fn get_tags(&self, key: &str) -> Option<&CachedTags> {
1109        self.tags.get(key)
1110    }
1111
1112    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
1113        match tags {
1114            Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
1115            None => self.tags.remove(&key),
1116        };
1117    }
1118}
1119
1120impl TcpListener {
1121    fn new(config: TcpListenerConfig, token: Token) -> Result<TcpListener, ListenerError> {
1122        Ok(TcpListener {
1123            cluster_id: None,
1124            listener: None,
1125            token,
1126            address: config.address.into(),
1127            config,
1128            active: false,
1129            tags: BTreeMap::new(),
1130        })
1131    }
1132
1133    pub fn activate(
1134        &mut self,
1135        registry: &Registry,
1136        tcp_listener: Option<MioTcpListener>,
1137    ) -> Result<Token, ProxyError> {
1138        if self.active {
1139            return Ok(self.token);
1140        }
1141
1142        let mut listener = match tcp_listener {
1143            Some(listener) => listener,
1144            None => {
1145                let address = self.config.address.into();
1146                server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
1147            }
1148        };
1149
1150        registry
1151            .register(&mut listener, self.token, Interest::READABLE)
1152            .map_err(ProxyError::RegisterListener)?;
1153
1154        self.listener = Some(listener);
1155        self.active = true;
1156        Ok(self.token)
1157    }
1158}
1159
1160fn handle_connection_result(
1161    connection_result: Result<BackendConnectAction, BackendConnectionError>,
1162) -> Option<SessionResult> {
1163    match connection_result {
1164        // reuse connection or send a default answer, we can continue
1165        Ok(BackendConnectAction::Reuse) => None,
1166        Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
1167            // we must wait for an event
1168            Some(SessionResult::Continue)
1169        }
1170        Err(_) => {
1171            // in case of BackendConnectionError::Backend(BackendError::ConnectionFailures(..))
1172            // we may want to retry instead of closing
1173            Some(SessionResult::Close)
1174        }
1175    }
1176}
1177
1178#[derive(Debug)]
1179pub struct ClusterConfiguration {
1180    proxy_protocol: Option<ProxyProtocolConfig>,
1181    // Uncomment this when implementing new load balancing algorithms
1182    // load_balancing: LoadBalancingAlgorithms,
1183}
1184
1185pub struct TcpProxy {
1186    fronts: HashMap<String, Token>,
1187    backends: Rc<RefCell<BackendMap>>,
1188    listeners: HashMap<Token, Rc<RefCell<TcpListener>>>,
1189    configs: HashMap<ClusterId, ClusterConfiguration>,
1190    registry: Registry,
1191    sessions: Rc<RefCell<SessionManager>>,
1192    pool: Rc<RefCell<Pool>>,
1193}
1194
1195impl TcpProxy {
1196    pub fn new(
1197        registry: Registry,
1198        sessions: Rc<RefCell<SessionManager>>,
1199        pool: Rc<RefCell<Pool>>,
1200        backends: Rc<RefCell<BackendMap>>,
1201    ) -> TcpProxy {
1202        TcpProxy {
1203            backends,
1204            listeners: HashMap::new(),
1205            configs: HashMap::new(),
1206            fronts: HashMap::new(),
1207            registry,
1208            sessions,
1209            pool,
1210        }
1211    }
1212
1213    pub fn add_listener(
1214        &mut self,
1215        config: TcpListenerConfig,
1216        token: Token,
1217    ) -> Result<Token, ProxyError> {
1218        match self.listeners.entry(token) {
1219            Entry::Vacant(entry) => {
1220                let tcp_listener =
1221                    TcpListener::new(config, token).map_err(ProxyError::AddListener)?;
1222                entry.insert(Rc::new(RefCell::new(tcp_listener)));
1223                Ok(token)
1224            }
1225            _ => Err(ProxyError::ListenerAlreadyPresent),
1226        }
1227    }
1228
1229    pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
1230        let len = self.listeners.len();
1231
1232        self.listeners.retain(|_, l| l.borrow().address != address);
1233        self.listeners.len() < len
1234    }
1235
1236    pub fn activate_listener(
1237        &self,
1238        addr: &SocketAddr,
1239        tcp_listener: Option<MioTcpListener>,
1240    ) -> Result<Token, ProxyError> {
1241        let listener = self
1242            .listeners
1243            .values()
1244            .find(|listener| listener.borrow().address == *addr)
1245            .ok_or(ProxyError::NoListenerFound(*addr))?;
1246
1247        listener.borrow_mut().activate(&self.registry, tcp_listener)
1248    }
1249
1250    pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1251        self.listeners
1252            .values()
1253            .filter_map(|listener| {
1254                let mut owned = listener.borrow_mut();
1255                if let Some(listener) = owned.listener.take() {
1256                    return Some((owned.address, listener));
1257                }
1258
1259                None
1260            })
1261            .collect()
1262    }
1263
1264    pub fn give_back_listener(
1265        &mut self,
1266        address: SocketAddr,
1267    ) -> Result<(Token, MioTcpListener), ProxyError> {
1268        let listener = self
1269            .listeners
1270            .values()
1271            .find(|listener| listener.borrow().address == address)
1272            .ok_or(ProxyError::NoListenerFound(address))?;
1273
1274        let mut owned = listener.borrow_mut();
1275
1276        let taken_listener = owned
1277            .listener
1278            .take()
1279            .ok_or(ProxyError::UnactivatedListener)?;
1280
1281        Ok((owned.token, taken_listener))
1282    }
1283
1284    pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1285        let address = front.address.into();
1286
1287        let mut listener = self
1288            .listeners
1289            .values()
1290            .find(|l| l.borrow().address == address)
1291            .ok_or(ProxyError::NoListenerFound(address))?
1292            .borrow_mut();
1293
1294        self.fronts
1295            .insert(front.cluster_id.to_string(), listener.token);
1296        listener.set_tags(address.to_string(), Some(front.tags));
1297        listener.cluster_id = Some(front.cluster_id);
1298        Ok(())
1299    }
1300
1301    pub fn remove_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1302        let address = front.address.into();
1303
1304        let mut listener = match self
1305            .listeners
1306            .values()
1307            .find(|l| l.borrow().address == address)
1308        {
1309            Some(l) => l.borrow_mut(),
1310            None => return Err(ProxyError::NoListenerFound(address)),
1311        };
1312
1313        listener.set_tags(address.to_string(), None);
1314        if let Some(cluster_id) = listener.cluster_id.take() {
1315            self.fronts.remove(&cluster_id);
1316        }
1317        Ok(())
1318    }
1319}
1320
1321impl ProxyConfiguration for TcpProxy {
1322    fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
1323        let request_type = match message.content.request_type {
1324            Some(t) => t,
1325            None => return WorkerResponse::error(message.id, "Empty request"),
1326        };
1327        match request_type {
1328            RequestType::AddTcpFrontend(front) => {
1329                if let Err(err) = self.add_tcp_front(front) {
1330                    return WorkerResponse::error(message.id, err);
1331                }
1332
1333                WorkerResponse::ok(message.id)
1334            }
1335            RequestType::RemoveTcpFrontend(front) => {
1336                if let Err(err) = self.remove_tcp_front(front) {
1337                    return WorkerResponse::error(message.id, err);
1338                }
1339
1340                WorkerResponse::ok(message.id)
1341            }
1342            RequestType::SoftStop(_) => {
1343                info!("{} processing soft shutdown", message.id);
1344                let listeners: HashMap<_, _> = self.listeners.drain().collect();
1345                for (_, l) in listeners.iter() {
1346                    l.borrow_mut()
1347                        .listener
1348                        .take()
1349                        .map(|mut sock| self.registry.deregister(&mut sock));
1350                }
1351                WorkerResponse::processing(message.id)
1352            }
1353            RequestType::HardStop(_) => {
1354                info!("{} hard shutdown", message.id);
1355                let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1356                for (_, l) in listeners.drain() {
1357                    l.borrow_mut()
1358                        .listener
1359                        .take()
1360                        .map(|mut sock| self.registry.deregister(&mut sock));
1361                }
1362                WorkerResponse::ok(message.id)
1363            }
1364            RequestType::Status(_) => {
1365                info!("{} status", message.id);
1366                WorkerResponse::ok(message.id)
1367            }
1368            RequestType::AddCluster(cluster) => {
1369                let config = ClusterConfiguration {
1370                    proxy_protocol: cluster
1371                        .proxy_protocol
1372                        .and_then(|n| ProxyProtocolConfig::try_from(n).ok()),
1373                    //load_balancing: cluster.load_balancing,
1374                };
1375                self.configs.insert(cluster.cluster_id, config);
1376                WorkerResponse::ok(message.id)
1377            }
1378            RequestType::RemoveCluster(cluster_id) => {
1379                self.configs.remove(&cluster_id);
1380                WorkerResponse::ok(message.id)
1381            }
1382            RequestType::RemoveListener(remove) => {
1383                if !self.remove_listener(remove.address.into()) {
1384                    WorkerResponse::error(
1385                        message.id,
1386                        format!("no TCP listener to remove at address {:?}", remove.address),
1387                    )
1388                } else {
1389                    WorkerResponse::ok(message.id)
1390                }
1391            }
1392            command => {
1393                debug!(
1394                    "{} unsupported message for TCP proxy, ignoring {:?}",
1395                    message.id, command
1396                );
1397                WorkerResponse::error(message.id, "unsupported message")
1398            }
1399        }
1400    }
1401
1402    fn accept(&mut self, token: ListenToken) -> Result<MioTcpStream, AcceptError> {
1403        let internal_token = Token(token.0);
1404        if let Some(listener) = self.listeners.get(&internal_token) {
1405            if let Some(tcp_listener) = &listener.borrow().listener {
1406                tcp_listener
1407                    .accept()
1408                    .map(|(frontend_sock, _)| frontend_sock)
1409                    .map_err(|e| match e.kind() {
1410                        ErrorKind::WouldBlock => AcceptError::WouldBlock,
1411                        _ => {
1412                            error!("accept() IO error: {:?}", e);
1413                            AcceptError::IoError
1414                        }
1415                    })
1416            } else {
1417                Err(AcceptError::IoError)
1418            }
1419        } else {
1420            Err(AcceptError::IoError)
1421        }
1422    }
1423
1424    fn create_session(
1425        &mut self,
1426        mut frontend_sock: MioTcpStream,
1427        token: ListenToken,
1428        wait_time: Duration,
1429        proxy: Rc<RefCell<Self>>,
1430    ) -> Result<(), AcceptError> {
1431        let listener_token = Token(token.0);
1432
1433        let listener = self
1434            .listeners
1435            .get(&listener_token)
1436            .ok_or(AcceptError::IoError)?;
1437
1438        let owned = listener.borrow();
1439        let mut pool = self.pool.borrow_mut();
1440
1441        let (front_buffer, back_buffer) = match (pool.checkout(), pool.checkout()) {
1442            (Some(fb), Some(bb)) => (fb, bb),
1443            _ => {
1444                error!("could not get buffers from pool");
1445                error!(
1446                    "Buffer capacity has been reached, stopping to accept new connections for now"
1447                );
1448                gauge!("accept_queue.backpressure", 1);
1449                self.sessions.borrow_mut().can_accept = false;
1450
1451                return Err(AcceptError::BufferCapacityReached);
1452            }
1453        };
1454
1455        if owned.cluster_id.is_none() {
1456            error!(
1457                "listener at address {:?} has no linked cluster",
1458                owned.address
1459            );
1460            return Err(AcceptError::IoError);
1461        }
1462
1463        let proxy_protocol = self
1464            .configs
1465            .get(owned.cluster_id.as_ref().unwrap())
1466            .and_then(|c| c.proxy_protocol);
1467
1468        if let Err(e) = frontend_sock.set_nodelay(true) {
1469            error!(
1470                "error setting nodelay on front socket({:?}): {:?}",
1471                frontend_sock, e
1472            );
1473        }
1474
1475        let mut session_manager = self.sessions.borrow_mut();
1476        let entry = session_manager.slab.vacant_entry();
1477        let frontend_token = Token(entry.key());
1478
1479        if let Err(register_error) = self.registry.register(
1480            &mut frontend_sock,
1481            frontend_token,
1482            Interest::READABLE | Interest::WRITABLE,
1483        ) {
1484            error!(
1485                "error registering front socket({:?}): {:?}",
1486                frontend_sock, register_error
1487            );
1488            return Err(AcceptError::RegisterError);
1489        }
1490
1491        let session = TcpSession::new(
1492            back_buffer,
1493            None,
1494            owned.cluster_id.clone(),
1495            Duration::from_secs(owned.config.back_timeout as u64),
1496            Duration::from_secs(owned.config.connect_timeout as u64),
1497            Duration::from_secs(owned.config.front_timeout as u64),
1498            front_buffer,
1499            frontend_token,
1500            listener.clone(),
1501            proxy_protocol,
1502            proxy,
1503            frontend_sock,
1504            wait_time,
1505        );
1506        incr!("tcp.requests");
1507
1508        let session = Rc::new(RefCell::new(session));
1509        entry.insert(session);
1510
1511        Ok(())
1512    }
1513}
1514
1515pub mod testing {
1516    use crate::testing::*;
1517
1518    /// This is not directly used by Sōzu but is available for example and testing purposes
1519    pub fn start_tcp_worker(
1520        config: TcpListenerConfig,
1521        max_buffers: usize,
1522        buffer_size: usize,
1523        channel: ProxyChannel,
1524    ) -> anyhow::Result<()> {
1525        let address = config.address.into();
1526
1527        let ServerParts {
1528            event_loop,
1529            registry,
1530            sessions,
1531            pool,
1532            backends,
1533            client_scm_socket: _,
1534            server_scm_socket,
1535            server_config,
1536        } = prebuild_server(max_buffers, buffer_size, true)?;
1537
1538        let token = {
1539            let mut sessions = sessions.borrow_mut();
1540            let entry = sessions.slab.vacant_entry();
1541            let key = entry.key();
1542            let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1543                protocol: Protocol::TCPListen,
1544            })));
1545            Token(key)
1546        };
1547
1548        let mut proxy = TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1549        proxy
1550            .add_listener(config, token)
1551            .with_context(|| "Failed at creating adding the listener")?;
1552        proxy
1553            .activate_listener(&address, None)
1554            .with_context(|| "Failed at creating activating the listener")?;
1555
1556        let mut server = Server::new(
1557            event_loop,
1558            channel,
1559            server_scm_socket,
1560            sessions,
1561            pool,
1562            backends,
1563            None,
1564            None,
1565            Some(proxy),
1566            server_config,
1567            None,
1568            false,
1569        )
1570        .with_context(|| "Failed at creating server")?;
1571
1572        debug!("starting event loop");
1573        server.run();
1574        debug!("ending event loop");
1575        Ok(())
1576    }
1577}
1578
1579#[cfg(test)]
1580mod tests {
1581    use std::{
1582        io::{Read, Write},
1583        net::{Shutdown, TcpListener, TcpStream},
1584        str,
1585        sync::{
1586            Arc, Barrier,
1587            atomic::{AtomicBool, Ordering},
1588        },
1589        thread,
1590    };
1591
1592    use sozu_command::{
1593        channel::Channel,
1594        config::ListenerBuilder,
1595        proto::command::{
1596            LoadBalancingParams, RequestTcpFrontend, SocketAddress, WorkerRequest, WorkerResponse,
1597            request::RequestType,
1598        },
1599    };
1600
1601    use super::testing::start_tcp_worker;
1602    use crate::testing::*;
1603    static TEST_FINISHED: AtomicBool = AtomicBool::new(false);
1604
1605    /*
1606    #[test]
1607    #[cfg(target_pointer_width = "64")]
1608    fn size_test() {
1609      assert_size!(Pipe<mio::net::TcpStream>, 224);
1610      assert_size!(SendProxyProtocol<mio::net::TcpStream>, 144);
1611      assert_size!(RelayProxyProtocol<mio::net::TcpStream>, 152);
1612      assert_size!(ExpectProxyProtocol<mio::net::TcpStream>, 520);
1613      assert_size!(State, 528);
1614      // fails depending on the platform?
1615      //assert_size!(Session, 808);
1616    }*/
1617
1618    #[test]
1619    fn round_trip() {
1620        setup_test_logger!();
1621        let barrier = Arc::new(Barrier::new(2));
1622        start_server(barrier.clone());
1623        let _tx = start_proxy().expect("Could not start proxy");
1624        barrier.wait();
1625
1626        let mut s1 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1627        let s3 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1628        let mut s2 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1629
1630        s1.write(&b"hello "[..])
1631            .map_err(|e| {
1632                TEST_FINISHED.store(true, Ordering::Relaxed);
1633                e
1634            })
1635            .unwrap();
1636        println!("s1 sent");
1637
1638        s2.write(&b"pouet pouet"[..])
1639            .map_err(|e| {
1640                TEST_FINISHED.store(true, Ordering::Relaxed);
1641                e
1642            })
1643            .unwrap();
1644
1645        println!("s2 sent");
1646
1647        let mut res = [0; 128];
1648        s1.write(&b"coucou"[..])
1649            .map_err(|e| {
1650                TEST_FINISHED.store(true, Ordering::Relaxed);
1651                e
1652            })
1653            .unwrap();
1654
1655        s3.shutdown(Shutdown::Both).unwrap();
1656        let sz2 = s2
1657            .read(&mut res[..])
1658            .map_err(|e| {
1659                TEST_FINISHED.store(true, Ordering::Relaxed);
1660                e
1661            })
1662            .expect("could not read from socket");
1663        println!("s2 received {:?}", str::from_utf8(&res[..sz2]));
1664        assert_eq!(&res[..sz2], &b"pouet pouet"[..]);
1665
1666        let sz1 = s1
1667            .read(&mut res[..])
1668            .map_err(|e| {
1669                TEST_FINISHED.store(true, Ordering::Relaxed);
1670                e
1671            })
1672            .expect("could not read from socket");
1673        println!(
1674            "s1 received again({}): {:?}",
1675            sz1,
1676            str::from_utf8(&res[..sz1])
1677        );
1678        assert_eq!(&res[..sz1], &b"hello coucou"[..]);
1679        TEST_FINISHED.store(true, Ordering::Relaxed);
1680    }
1681
1682    fn start_server(barrier: Arc<Barrier>) {
1683        let listener = TcpListener::bind("127.0.0.1:5678").expect("could not bind");
1684        fn handle_client(stream: &mut TcpStream, id: u8) {
1685            let mut buf = [0; 128];
1686            let _response = b" END";
1687            while let Ok(sz) = stream.read(&mut buf[..]) {
1688                if sz > 0 {
1689                    println!("ECHO[{}] got \"{:?}\"", id, str::from_utf8(&buf[..sz]));
1690                    stream.write(&buf[..sz]).unwrap();
1691                }
1692                if TEST_FINISHED.load(Ordering::Relaxed) {
1693                    println!("backend server stopping");
1694                    break;
1695                }
1696            }
1697        }
1698
1699        let mut count = 0;
1700        thread::spawn(move || {
1701            barrier.wait();
1702            for conn in listener.incoming() {
1703                match conn {
1704                    Ok(mut stream) => {
1705                        thread::spawn(move || {
1706                            println!("got a new client: {count}");
1707                            handle_client(&mut stream, count)
1708                        });
1709                    }
1710                    Err(e) => {
1711                        println!("connection failed: {e:?}");
1712                    }
1713                }
1714                count += 1;
1715            }
1716        });
1717    }
1718
1719    /// used in tests only
1720    pub fn start_proxy() -> anyhow::Result<Channel<WorkerRequest, WorkerResponse>> {
1721        let config = ListenerBuilder::new_tcp(SocketAddress::new_v4(127, 0, 0, 1, 1234))
1722            .to_tcp(None)
1723            .expect("could not create listener config");
1724
1725        let (mut command, channel) =
1726            Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
1727        let _jg = thread::spawn(move || {
1728            setup_test_logger!();
1729            start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server");
1730        });
1731
1732        command.blocking().unwrap();
1733        {
1734            let front = RequestTcpFrontend {
1735                cluster_id: String::from("yolo"),
1736                address: SocketAddress::new_v4(127, 0, 0, 1, 1234),
1737                ..Default::default()
1738            };
1739            let backend = sozu_command_lib::response::Backend {
1740                cluster_id: String::from("yolo"),
1741                backend_id: String::from("yolo-0"),
1742                address: SocketAddress::new_v4(127, 0, 0, 1, 5678).into(),
1743                load_balancing_parameters: Some(LoadBalancingParams::default()),
1744                sticky_id: None,
1745                backup: None,
1746            };
1747
1748            command
1749                .write_message(&WorkerRequest {
1750                    id: String::from("ID_YOLO1"),
1751                    content: RequestType::AddTcpFrontend(front).into(),
1752                })
1753                .unwrap();
1754            command
1755                .write_message(&WorkerRequest {
1756                    id: String::from("ID_YOLO2"),
1757                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
1758                })
1759                .unwrap();
1760        }
1761        {
1762            let front = RequestTcpFrontend {
1763                cluster_id: String::from("yolo"),
1764                address: SocketAddress::new_v4(127, 0, 0, 1, 1235),
1765                ..Default::default()
1766            };
1767            let backend = sozu_command::response::Backend {
1768                cluster_id: String::from("yolo"),
1769                backend_id: String::from("yolo-0"),
1770                address: SocketAddress::new_v4(127, 0, 0, 1, 5678).into(),
1771                load_balancing_parameters: Some(LoadBalancingParams::default()),
1772                sticky_id: None,
1773                backup: None,
1774            };
1775            command
1776                .write_message(&WorkerRequest {
1777                    id: String::from("ID_YOLO3"),
1778                    content: RequestType::AddTcpFrontend(front).into(),
1779                })
1780                .unwrap();
1781            command
1782                .write_message(&WorkerRequest {
1783                    id: String::from("ID_YOLO4"),
1784                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
1785                })
1786                .unwrap();
1787        }
1788
1789        // not sure why four times
1790        for _ in 0..4 {
1791            println!(
1792                "read_message: {:?}",
1793                command
1794                    .read_message()
1795                    .with_context(|| "could not read message")?
1796            );
1797        }
1798
1799        Ok(command)
1800    }
1801}