Skip to main content

sozu_lib/
tcp.rs

1use std::{
2    cell::RefCell,
3    collections::{BTreeMap, HashMap, hash_map::Entry},
4    io::ErrorKind,
5    net::{Shutdown, SocketAddr},
6    os::unix::io::AsRawFd,
7    rc::Rc,
8    time::{Duration, Instant},
9};
10
11use mio::{
12    Interest, Registry, Token,
13    net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream},
14    unix::SourceFd,
15};
16use rusty_ulid::Ulid;
17use sozu_command::{
18    ObjectKind,
19    config::MAX_LOOP_ITERATIONS,
20    logging::{EndpointRecord, LogContext, ansi_palette},
21    proto::command::request::RequestType,
22};
23
24use crate::metrics::names;
25use crate::{
26    AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
27    ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
28    Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
29    backends::{Backend, BackendMap},
30    pool::{Checkout, Pool},
31    protocol::{
32        Pipe,
33        pipe::WebSocketContext,
34        proxy_protocol::{
35            expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
36        },
37    },
38    retry::RetryPolicy,
39    server::{CONN_RETRIES, ListenToken, SessionManager, push_event},
40    socket::{server_bind, stats::socket_rtt},
41    sozu_command::{
42        proto::command::{
43            Event, EventKind, ProxyProtocolConfig, RequestTcpFrontend, TcpListenerConfig,
44            UpdateTcpListenerConfig, WorkerRequest, WorkerResponse,
45        },
46        ready::Ready,
47        state::ClusterId,
48    },
49    timer::TimeoutContainer,
50};
51
52StateMachineBuilder! {
53    /// The various Stages of a TCP connection:
54    ///
55    /// 1. optional (ExpectProxyProtocol | SendProxyProtocol | RelayProxyProtocol)
56    /// 2. Pipe
57    enum TcpStateMachine {
58        Pipe(Pipe<MioTcpStream, TcpListener>),
59        SendProxyProtocol(SendProxyProtocol<MioTcpStream>),
60        RelayProxyProtocol(RelayProxyProtocol<MioTcpStream>),
61        ExpectProxyProtocol(ExpectProxyProtocol<MioTcpStream>),
62    }
63}
64
65/// This macro is defined uniquely in this module to help the tracking of kawa h1
66/// issues inside Sōzu. Colored output uses the unified log-context scheme:
67/// bold bright-white protocol label, light-grey `Session` keyword, gray keys
68/// and bright-white values.
69macro_rules! log_context {
70    ($self:expr) => {{
71        let (open, reset, grey, gray, white) = ansi_palette();
72        format!(
73            "{gray}{ctx}{reset}\t{open}TCP{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset})\t >>>",
74            open = open,
75            reset = reset,
76            grey = grey,
77            gray = gray,
78            white = white,
79            ctx = $self.log_context(),
80            frontend = $self.frontend_token.0,
81            backend = $self
82                .backend_token
83                .map(|token| token.0.to_string())
84                .unwrap_or_else(|| "<none>".to_string()),
85        )
86    }};
87}
88
89/// Module-level prefix for log lines emitted from this file when no
90/// [`TcpSession`] is in scope. Produces a bold bright-white `TCP` label
91/// (uniform with the per-session `log_context!`) when the logger is in
92/// colored mode. Used by [`TcpProxy`] callbacks (notify, accept,
93/// create_session, soft_stop, hard_stop, status) and the `testing`
94/// helper module which own a listener/token map but have no
95/// `frontend_token` of their own.
96macro_rules! log_module_context {
97    () => {{
98        let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
99        format!("{open}TCP{reset}\t >>>", open = open, reset = reset)
100    }};
101}
102
103pub struct TcpSession {
104    backend_buffer: Option<Checkout>,
105    backend_connected: BackendConnectionStatus,
106    backend_id: Option<String>,
107    backend_token: Option<Token>,
108    backend: Option<Rc<RefCell<Backend>>>,
109    cluster_id: Option<String>,
110    configured_backend_timeout: Duration,
111    connection_attempt: u8,
112    container_backend_timeout: TimeoutContainer,
113    container_frontend_timeout: TimeoutContainer,
114    frontend_address: Option<SocketAddr>,
115    frontend_buffer: Option<Checkout>,
116    frontend_token: Token,
117    has_been_closed: SessionIsToBeClosed,
118    last_event: Instant,
119    listener: Rc<RefCell<TcpListener>>,
120    metrics: SessionMetrics,
121    proxy: Rc<RefCell<TcpProxy>>,
122    request_id: Ulid,
123    state: TcpStateMachine,
124    /// `true` once `connect_to_backend` has accounted this session
125    /// against the per-(cluster, source-IP) connection counter. Drives
126    /// the symmetric `untrack_all_cluster_ip` call in `close`. The flag
127    /// is per-session, not per-attempt: a TCP session has at most one
128    /// `(cluster, ip)` slot, so the SessionManager-side idempotency
129    /// already covers retries — this flag exists only to short-circuit
130    /// the close path's untrack when the feature is disabled or no
131    /// admit ever ran.
132    cluster_ip_tracked: bool,
133}
134
135impl TcpSession {
136    #[allow(clippy::too_many_arguments)]
137    fn new(
138        backend_buffer: Checkout,
139        backend_id: Option<String>,
140        cluster_id: Option<String>,
141        configured_backend_timeout: Duration,
142        configured_connect_timeout: Duration,
143        configured_frontend_timeout: Duration,
144        frontend_buffer: Checkout,
145        frontend_token: Token,
146        listener: Rc<RefCell<TcpListener>>,
147        proxy_protocol: Option<ProxyProtocolConfig>,
148        proxy: Rc<RefCell<TcpProxy>>,
149        socket: MioTcpStream,
150        wait_time: Duration,
151    ) -> TcpSession {
152        let frontend_address = socket.peer_addr().ok();
153        let mut frontend_buffer_session = None;
154        let mut backend_buffer_session = None;
155
156        let request_id = Ulid::generate();
157
158        let container_frontend_timeout =
159            TimeoutContainer::new(configured_frontend_timeout, frontend_token);
160        let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout);
161
162        let state = match proxy_protocol {
163            Some(ProxyProtocolConfig::RelayHeader) => {
164                backend_buffer_session = Some(backend_buffer);
165                gauge_add!(names::protocol::PROXY_RELAY, 1);
166                TcpStateMachine::RelayProxyProtocol(RelayProxyProtocol::new(
167                    socket,
168                    frontend_token,
169                    request_id,
170                    None,
171                    frontend_buffer,
172                ))
173            }
174            Some(ProxyProtocolConfig::ExpectHeader) => {
175                frontend_buffer_session = Some(frontend_buffer);
176                backend_buffer_session = Some(backend_buffer);
177                gauge_add!(names::protocol::PROXY_EXPECT, 1);
178                TcpStateMachine::ExpectProxyProtocol(ExpectProxyProtocol::new(
179                    container_frontend_timeout.clone(),
180                    socket,
181                    frontend_token,
182                    request_id,
183                ))
184            }
185            Some(ProxyProtocolConfig::SendHeader) => {
186                frontend_buffer_session = Some(frontend_buffer);
187                backend_buffer_session = Some(backend_buffer);
188                gauge_add!(names::protocol::PROXY_SEND, 1);
189                TcpStateMachine::SendProxyProtocol(SendProxyProtocol::new(
190                    socket,
191                    frontend_token,
192                    request_id,
193                    None,
194                ))
195            }
196            None => {
197                gauge_add!(names::protocol::TCP, 1);
198                let mut pipe = Pipe::new(
199                    backend_buffer,
200                    backend_id.clone(),
201                    None,
202                    None,
203                    None,
204                    None,
205                    cluster_id.clone(),
206                    frontend_buffer,
207                    frontend_token,
208                    socket,
209                    listener.clone(),
210                    Protocol::TCP,
211                    request_id,
212                    request_id,
213                    frontend_address,
214                    WebSocketContext::Tcp,
215                );
216                pipe.set_cluster_id(cluster_id.clone());
217                TcpStateMachine::Pipe(pipe)
218            }
219        };
220
221        let metrics = SessionMetrics::new(Some(wait_time));
222        //FIXME: timeout usage
223
224        TcpSession {
225            backend_buffer: backend_buffer_session,
226            backend_connected: BackendConnectionStatus::NotConnected,
227            backend_id,
228            backend_token: None,
229            backend: None,
230            cluster_id,
231            configured_backend_timeout,
232            connection_attempt: 0,
233            container_backend_timeout,
234            container_frontend_timeout,
235            frontend_address,
236            frontend_buffer: frontend_buffer_session,
237            frontend_token,
238            has_been_closed: false,
239            last_event: Instant::now(),
240            listener,
241            metrics,
242            proxy,
243            request_id,
244            state,
245            cluster_ip_tracked: false,
246        }
247    }
248
249    /// Source-IP for per-(cluster, source-IP) accounting.
250    ///
251    /// Prefer the parsed PROXY-v2 source from whichever upgrade phase is
252    /// in flight, then the post-upgrade `Pipe.session_address`, finally
253    /// the raw TCP `peer_addr` captured at session creation. The
254    /// `Pipe::session_address` itself is already PROXY-v2-aware after
255    /// `expect.rs::into_pipe` and `relay.rs::into_pipe`.
256    fn effective_session_address(&self) -> Option<SocketAddr> {
257        match &self.state {
258            TcpStateMachine::Pipe(pipe) => pipe.get_session_address(),
259            TcpStateMachine::ExpectProxyProtocol(epp) => {
260                epp.addresses.as_ref().and_then(|pa| pa.source())
261            }
262            TcpStateMachine::RelayProxyProtocol(rpp) => {
263                rpp.addresses.as_ref().and_then(|pa| pa.source())
264            }
265            TcpStateMachine::SendProxyProtocol(_) | TcpStateMachine::FailedUpgrade(_) => None,
266        }
267        .or(self.frontend_address)
268    }
269
270    fn log_request(&self) {
271        let listener = self.listener.borrow();
272        let context = self.log_context();
273        self.metrics.register_end_of_session(&context);
274        info_access!(
275            on_failure: { incr!(names::access_logs::UNSENT) },
276            message: None,
277            context,
278            session_address: self.frontend_address,
279            backend_address: None,
280            protocol: "TCP",
281            endpoint: EndpointRecord::Tcp,
282            tags: listener.get_tags(&listener.get_addr().to_string()),
283            client_rtt: socket_rtt(self.state.front_socket()),
284            server_rtt: None,
285            user_agent: None,
286            x_request_id: None,
287            // TCP listener accepts a raw `MioTcpStream` (lib/src/tcp.rs:128)
288            // — Sōzu does not terminate TLS on the TCP path, so all five TLS
289            // fields and the parsed XFF chain are always absent here.
290            tls_version: None,
291            tls_cipher: None,
292            tls_sni: None,
293            tls_alpn: None,
294            xff_chain: None,
295            service_time: self.metrics.service_time(),
296            response_time: self.metrics.backend_response_time(),
297            request_time: self.metrics.request_time(),
298            bytes_in: self.metrics.bin,
299            bytes_out: self.metrics.bout,
300            otel: None,
301        );
302    }
303
304    fn front_hup(&mut self) -> SessionResult {
305        match &mut self.state {
306            TcpStateMachine::Pipe(pipe) => pipe.frontend_hup(&mut self.metrics),
307            _ => {
308                self.log_request();
309                SessionResult::Close
310            }
311        }
312    }
313
314    fn back_hup(&mut self) -> SessionResult {
315        match &mut self.state {
316            TcpStateMachine::Pipe(pipe) => pipe.backend_hup(&mut self.metrics),
317            _ => {
318                self.log_request();
319                SessionResult::Close
320            }
321        }
322    }
323
324    fn log_context(&self) -> LogContext<'_> {
325        LogContext {
326            session_id: self.request_id,
327            request_id: Some(self.request_id),
328            cluster_id: self.cluster_id.as_deref(),
329            backend_id: self.backend_id.as_deref(),
330        }
331    }
332
333    fn readable(&mut self) -> SessionResult {
334        if !self.container_frontend_timeout.reset() {
335            error!(
336                "{} Could not reset frontend timeout on readable",
337                log_context!(self)
338            );
339        }
340        if self.backend_connected == BackendConnectionStatus::Connected
341            && !self.container_backend_timeout.reset()
342        {
343            error!(
344                "{} Could not reset backend timeout on readable",
345                log_context!(self)
346            );
347        }
348        match &mut self.state {
349            TcpStateMachine::Pipe(pipe) => pipe.readable(&mut self.metrics),
350            TcpStateMachine::RelayProxyProtocol(pp) => pp.readable(&mut self.metrics),
351            TcpStateMachine::ExpectProxyProtocol(pp) => pp.readable(&mut self.metrics),
352            TcpStateMachine::SendProxyProtocol(_) => SessionResult::Continue,
353            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
354        }
355    }
356
357    fn writable(&mut self) -> SessionResult {
358        match &mut self.state {
359            TcpStateMachine::Pipe(pipe) => pipe.writable(&mut self.metrics),
360            _ => SessionResult::Continue,
361        }
362    }
363
364    fn back_readable(&mut self) -> SessionResult {
365        if !self.container_frontend_timeout.reset() {
366            error!(
367                "{} Could not reset frontend timeout on back_readable",
368                log_context!(self)
369            );
370        }
371        if !self.container_backend_timeout.reset() {
372            error!(
373                "{} Could not reset backend timeout on back_readable",
374                log_context!(self)
375            );
376        }
377
378        match &mut self.state {
379            TcpStateMachine::Pipe(pipe) => pipe.backend_readable(&mut self.metrics),
380            _ => SessionResult::Continue,
381        }
382    }
383
384    fn back_writable(&mut self) -> SessionResult {
385        match &mut self.state {
386            TcpStateMachine::Pipe(pipe) => pipe.backend_writable(&mut self.metrics),
387            TcpStateMachine::RelayProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
388            TcpStateMachine::SendProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
389            TcpStateMachine::ExpectProxyProtocol(_) => SessionResult::Continue,
390            TcpStateMachine::FailedUpgrade(_) => {
391                unreachable!()
392            }
393        }
394    }
395
396    fn back_socket_mut(&mut self) -> Option<&mut MioTcpStream> {
397        match &mut self.state {
398            TcpStateMachine::Pipe(pipe) => pipe.back_socket_mut(),
399            TcpStateMachine::SendProxyProtocol(pp) => pp.back_socket_mut(),
400            TcpStateMachine::RelayProxyProtocol(pp) => pp.back_socket_mut(),
401            TcpStateMachine::ExpectProxyProtocol(_) => None,
402            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
403        }
404    }
405
406    pub fn upgrade(&mut self) -> SessionIsToBeClosed {
407        let new_state = match self.state.take() {
408            TcpStateMachine::SendProxyProtocol(spp) => self.upgrade_send(spp),
409            TcpStateMachine::RelayProxyProtocol(rpp) => self.upgrade_relay(rpp),
410            TcpStateMachine::ExpectProxyProtocol(epp) => self.upgrade_expect(epp),
411            TcpStateMachine::Pipe(_) => None,
412            TcpStateMachine::FailedUpgrade(_) => todo!(),
413        };
414
415        match new_state {
416            Some(state) => {
417                self.state = state;
418                false
419            } // The state stays FailedUpgrade, but the Session should be closed right after
420
421            None => true,
422        }
423    }
424
425    fn upgrade_send(
426        &mut self,
427        send_proxy_protocol: SendProxyProtocol<MioTcpStream>,
428    ) -> Option<TcpStateMachine> {
429        if self.backend_buffer.is_some() && self.frontend_buffer.is_some() {
430            let mut pipe = send_proxy_protocol.into_pipe(
431                self.frontend_buffer.take().unwrap(),
432                self.backend_buffer.take().unwrap(),
433                self.listener.clone(),
434            );
435
436            pipe.set_cluster_id(self.cluster_id.clone());
437            gauge_add!(names::protocol::PROXY_SEND, -1);
438            gauge_add!(names::protocol::TCP, 1);
439            return Some(TcpStateMachine::Pipe(pipe));
440        }
441
442        error!(
443            "{} Missing the frontend or backend buffer queue, we can't switch to a pipe",
444            log_context!(self)
445        );
446        None
447    }
448
449    fn upgrade_relay(&mut self, rpp: RelayProxyProtocol<MioTcpStream>) -> Option<TcpStateMachine> {
450        if self.backend_buffer.is_some() {
451            let mut pipe =
452                rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone());
453            pipe.set_cluster_id(self.cluster_id.clone());
454            gauge_add!(names::protocol::PROXY_RELAY, -1);
455            gauge_add!(names::protocol::TCP, 1);
456            return Some(TcpStateMachine::Pipe(pipe));
457        }
458
459        error!(
460            "{} Missing the backend buffer queue, we can't switch to a pipe",
461            log_context!(self)
462        );
463        None
464    }
465
466    fn upgrade_expect(
467        &mut self,
468        epp: ExpectProxyProtocol<MioTcpStream>,
469    ) -> Option<TcpStateMachine> {
470        if self.frontend_buffer.is_some() && self.backend_buffer.is_some() {
471            let mut pipe = epp.into_pipe(
472                self.frontend_buffer.take().unwrap(),
473                self.backend_buffer.take().unwrap(),
474                None,
475                None,
476                self.listener.clone(),
477            );
478
479            pipe.set_cluster_id(self.cluster_id.clone());
480            gauge_add!(names::protocol::PROXY_EXPECT, -1);
481            gauge_add!(names::protocol::TCP, 1);
482            return Some(TcpStateMachine::Pipe(pipe));
483        }
484
485        error!(
486            "{} Missing the backend buffer queue, we can't switch to a pipe",
487            log_context!(self)
488        );
489        None
490    }
491
492    fn front_readiness(&mut self) -> &mut Readiness {
493        match &mut self.state {
494            TcpStateMachine::Pipe(pipe) => &mut pipe.frontend_readiness,
495            TcpStateMachine::SendProxyProtocol(pp) => &mut pp.frontend_readiness,
496            TcpStateMachine::RelayProxyProtocol(pp) => &mut pp.frontend_readiness,
497            TcpStateMachine::ExpectProxyProtocol(pp) => &mut pp.frontend_readiness,
498            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
499        }
500    }
501
502    fn back_readiness(&mut self) -> Option<&mut Readiness> {
503        match &mut self.state {
504            TcpStateMachine::Pipe(pipe) => Some(&mut pipe.backend_readiness),
505            TcpStateMachine::SendProxyProtocol(pp) => Some(&mut pp.backend_readiness),
506            TcpStateMachine::RelayProxyProtocol(pp) => Some(&mut pp.backend_readiness),
507            TcpStateMachine::ExpectProxyProtocol(_) => None,
508            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
509        }
510    }
511
512    fn set_back_socket(&mut self, socket: MioTcpStream) {
513        match &mut self.state {
514            TcpStateMachine::Pipe(pipe) => pipe.set_back_socket(socket),
515            TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_socket(socket),
516            TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_socket(socket),
517            TcpStateMachine::ExpectProxyProtocol(_) => {
518                error!(
519                    "{} We should not set the back socket for the expect proxy protocol",
520                    log_context!(self)
521                );
522                panic!(
523                    "{} We should not set the back socket for the expect proxy protocol",
524                    log_context!(self)
525                );
526            }
527            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
528        }
529    }
530
531    fn set_back_token(&mut self, token: Token) {
532        self.backend_token = Some(token);
533
534        match &mut self.state {
535            TcpStateMachine::Pipe(pipe) => pipe.set_back_token(token),
536            TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_token(token),
537            TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_token(token),
538            TcpStateMachine::ExpectProxyProtocol(_) => self.backend_token = Some(token),
539            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
540        }
541    }
542
543    fn set_backend_id(&mut self, id: String) {
544        self.backend_id = Some(id.clone());
545        if let TcpStateMachine::Pipe(pipe) = &mut self.state {
546            pipe.set_backend_id(Some(id));
547        }
548    }
549
550    fn back_connected(&self) -> BackendConnectionStatus {
551        self.backend_connected
552    }
553
554    fn set_back_connected(&mut self, status: BackendConnectionStatus) {
555        let last = self.backend_connected;
556        self.backend_connected = status;
557
558        if status == BackendConnectionStatus::Connected {
559            gauge_add!(names::backend::CONNECTIONS, 1);
560            gauge_add!(
561                names::backend::CONNECTIONS_PER_BACKEND,
562                1,
563                self.cluster_id.as_deref(),
564                self.metrics.backend_id.as_deref()
565            );
566
567            // the back timeout was of connect_timeout duration before,
568            // now that we're connected, move to backend_timeout duration
569            self.container_backend_timeout
570                .set_duration(self.configured_backend_timeout);
571            self.container_frontend_timeout.reset();
572
573            if let TcpStateMachine::SendProxyProtocol(spp) = &mut self.state {
574                spp.set_back_connected(BackendConnectionStatus::Connected);
575            }
576
577            if let Some(backend) = self.backend.as_ref() {
578                let mut backend = backend.borrow_mut();
579
580                if backend.retry_policy.is_down() {
581                    incr!(
582                        "backend.up",
583                        self.cluster_id.as_deref(),
584                        self.metrics.backend_id.as_deref()
585                    );
586                    gauge!(
587                        names::backend::AVAILABLE,
588                        1,
589                        self.cluster_id.as_deref(),
590                        self.metrics.backend_id.as_deref()
591                    );
592                    info!(
593                        "{} backend server {} at {} is up",
594                        log_context!(self),
595                        backend.backend_id,
596                        backend.address
597                    );
598                    push_event(Event {
599                        kind: EventKind::BackendUp as i32,
600                        backend_id: Some(backend.backend_id.to_owned()),
601                        address: Some(backend.address.into()),
602                        cluster_id: None,
603                        metric_detail: None,
604                    });
605                }
606
607                if let BackendConnectionStatus::Connecting(start) = last {
608                    backend.set_connection_time(Instant::now() - start);
609                }
610
611                //successful connection, rest failure counter
612                backend.failures = 0;
613                backend.retry_policy.succeed();
614            }
615        }
616    }
617
618    fn remove_backend(&mut self) {
619        if let Some(backend) = self.backend.take() {
620            (*backend.borrow_mut()).dec_connections();
621        }
622
623        self.backend_token = None;
624    }
625
626    fn fail_backend_connection(&mut self) {
627        if let Some(backend) = self.backend.as_ref() {
628            let backend = &mut *backend.borrow_mut();
629            backend.failures += 1;
630
631            let already_unavailable = backend.retry_policy.is_down();
632            backend.retry_policy.fail();
633            incr!(
634                "backend.connections.error",
635                self.cluster_id.as_deref(),
636                self.metrics.backend_id.as_deref()
637            );
638            if !already_unavailable && backend.retry_policy.is_down() {
639                error!(
640                    "{} backend server {} at {} is down",
641                    log_context!(self),
642                    backend.backend_id,
643                    backend.address
644                );
645                incr!(
646                    "backend.down",
647                    self.cluster_id.as_deref(),
648                    self.metrics.backend_id.as_deref()
649                );
650                gauge!(
651                    names::backend::AVAILABLE,
652                    0,
653                    self.cluster_id.as_deref(),
654                    self.metrics.backend_id.as_deref()
655                );
656
657                push_event(Event {
658                    kind: EventKind::BackendDown as i32,
659                    backend_id: Some(backend.backend_id.to_owned()),
660                    address: Some(backend.address.into()),
661                    cluster_id: None,
662                    metric_detail: None,
663                });
664            }
665        }
666    }
667
668    pub fn test_back_socket(&mut self) -> SessionIsToBeClosed {
669        match self.back_socket_mut() {
670            Some(ref mut s) => {
671                let mut tmp = [0u8; 1];
672                let res = s.peek(&mut tmp[..]);
673
674                match res {
675                    // if the socket is half open, it will report 0 bytes read (EOF)
676                    Ok(0) => false,
677                    Ok(_) => true,
678                    Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
679                }
680            }
681            None => false,
682        }
683    }
684
685    pub fn cancel_timeouts(&mut self) {
686        self.container_frontend_timeout.cancel();
687        self.container_backend_timeout.cancel();
688    }
689
690    fn ready_inner(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionResult {
691        let mut counter = 0;
692
693        let back_connected = self.back_connected();
694        if back_connected.is_connecting() {
695            if self.back_readiness().unwrap().event.is_hup() && !self.test_back_socket() {
696                //retry connecting the backend
697                debug!(
698                    "{} error connecting to backend, trying again",
699                    log_context!(self)
700                );
701                self.connection_attempt += 1;
702                self.fail_backend_connection();
703
704                // trigger a backend reconnection
705                self.close_backend();
706                let connection_result = self.connect_to_backend(session.clone());
707                if let Err(err) = &connection_result {
708                    match err {
709                        // Already logged at warn! + metered at the retry-budget
710                        // gate in connect_to_backend; avoid double-emission.
711                        BackendConnectionError::MaxConnectionRetries(_) => trace!(
712                            "{} Error connecting to backend: {}",
713                            log_context!(self),
714                            err
715                        ),
716                        _ => warn!(
717                            "{} Error connecting to backend: {}",
718                            log_context!(self),
719                            err
720                        ),
721                    }
722                }
723
724                if let Some(state_result) = handle_connection_result(connection_result) {
725                    return state_result;
726                }
727            } else if self.back_readiness().unwrap().event != Ready::EMPTY {
728                self.connection_attempt = 0;
729                self.set_back_connected(BackendConnectionStatus::Connected);
730            }
731        } else if back_connected == BackendConnectionStatus::NotConnected {
732            let connection_result = self.connect_to_backend(session.clone());
733            if let Err(err) = &connection_result {
734                match err {
735                    BackendConnectionError::MaxConnectionRetries(_) => trace!(
736                        "{} Error connecting to backend: {}",
737                        log_context!(self),
738                        err
739                    ),
740                    _ => warn!(
741                        "{} Error connecting to backend: {}",
742                        log_context!(self),
743                        err
744                    ),
745                }
746            }
747
748            if let Some(state_result) = handle_connection_result(connection_result) {
749                return state_result;
750            }
751        }
752
753        if self.front_readiness().event.is_hup() {
754            let session_result = self.front_hup();
755            if session_result == SessionResult::Continue {
756                self.front_readiness().event.remove(Ready::HUP);
757            }
758            return session_result;
759        }
760
761        while counter < MAX_LOOP_ITERATIONS {
762            let front_interest = self.front_readiness().interest & self.front_readiness().event;
763            let back_interest = self
764                .back_readiness()
765                .map(|r| r.interest & r.event)
766                .unwrap_or(Ready::EMPTY);
767
768            trace!(
769                "{} Frontend interest({:?}) and backend interest({:?})",
770                log_context!(self),
771                front_interest,
772                back_interest
773            );
774
775            if front_interest == Ready::EMPTY && back_interest == Ready::EMPTY {
776                break;
777            }
778
779            if self
780                .back_readiness()
781                .map(|r| r.event.is_hup())
782                .unwrap_or(false)
783                && self.front_readiness().interest.is_writable()
784                && !self.front_readiness().event.is_writable()
785            {
786                break;
787            }
788
789            if front_interest.is_readable() {
790                let session_result = self.readable();
791                if session_result != SessionResult::Continue {
792                    return session_result;
793                }
794            }
795
796            if back_interest.is_writable() {
797                let session_result = self.back_writable();
798                if session_result != SessionResult::Continue {
799                    return session_result;
800                }
801            }
802
803            if back_interest.is_readable() {
804                let session_result = self.back_readable();
805                if session_result != SessionResult::Continue {
806                    return session_result;
807                }
808            }
809
810            if front_interest.is_writable() {
811                let session_result = self.writable();
812                if session_result != SessionResult::Continue {
813                    return session_result;
814                }
815            }
816
817            if back_interest.is_hup() {
818                let session_result = self.back_hup();
819                if session_result != SessionResult::Continue {
820                    return session_result;
821                }
822            }
823
824            if front_interest.is_error() {
825                error!(
826                    "{} Frontend socket error, disconnecting",
827                    log_context!(self)
828                );
829                self.front_readiness().interest = Ready::EMPTY;
830                if let Some(r) = self.back_readiness() {
831                    r.interest = Ready::EMPTY;
832                }
833
834                return SessionResult::Close;
835            }
836
837            if back_interest.is_error() && self.back_hup() == SessionResult::Close {
838                self.front_readiness().interest = Ready::EMPTY;
839                if let Some(r) = self.back_readiness() {
840                    r.interest = Ready::EMPTY;
841                }
842
843                error!("{} backend socket error, disconnecting", log_context!(self));
844                return SessionResult::Close;
845            }
846
847            counter += 1;
848        }
849
850        if counter >= MAX_LOOP_ITERATIONS {
851            error!(
852                "{} Handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
853                log_context!(self),
854                MAX_LOOP_ITERATIONS
855            );
856
857            incr!(names::tcp::INFINITE_LOOP_ERROR);
858
859            let front_interest = self.front_readiness().interest & self.front_readiness().event;
860            let back_interest = self
861                .back_readiness()
862                .map(|r| r.interest & r.event)
863                .unwrap_or(Ready::EMPTY);
864
865            let back = self.back_readiness().cloned();
866
867            error!(
868                "{} readiness: front {:?} / back {:?} | front: {:?} | back: {:?} ",
869                log_context!(self),
870                self.front_readiness(),
871                back,
872                front_interest,
873                back_interest
874            );
875
876            self.print_session();
877
878            return SessionResult::Close;
879        }
880
881        SessionResult::Continue
882    }
883
884    /// TCP session closes its backend on its own, without defering this task to the state
885    fn close_backend(&mut self) {
886        if let (Some(token), Some(fd)) = (
887            self.backend_token,
888            self.back_socket_mut().map(|s| s.as_raw_fd()),
889        ) {
890            let proxy = self.proxy.borrow();
891            if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
892                error!(
893                    "{} Error deregistering socket({:?}): {:?}",
894                    log_context!(self),
895                    fd,
896                    e
897                );
898            }
899
900            proxy.sessions.borrow_mut().slab.try_remove(token.0);
901        }
902        self.remove_backend();
903
904        let back_connected = self.back_connected();
905        if back_connected != BackendConnectionStatus::NotConnected {
906            if let Some(r) = self.back_readiness() {
907                r.event = Ready::EMPTY;
908            }
909
910            let log_context = log_context!(self);
911            if let Some(sock) = self.back_socket_mut() {
912                // TCP-only backend in the pure-TCP proxy: no outbound TLS
913                // buffer to truncate, so `Shutdown::Both` is the right call.
914                // If the TCP listener ever gains an inline TLS upgrade,
915                // switch to `Shutdown::Write` here.
916                if let Err(e) = sock.shutdown(Shutdown::Both) {
917                    if e.kind() != ErrorKind::NotConnected {
918                        error!(
919                            "{} Error closing back socket({:?}): {:?}",
920                            log_context, sock, e
921                        );
922                    }
923                }
924            }
925        }
926
927        if back_connected == BackendConnectionStatus::Connected {
928            gauge_add!(names::backend::CONNECTIONS, -1);
929            gauge_add!(
930                names::backend::CONNECTIONS_PER_BACKEND,
931                -1,
932                self.cluster_id.as_deref(),
933                self.metrics.backend_id.as_deref()
934            );
935        }
936
937        self.set_back_connected(BackendConnectionStatus::NotConnected);
938    }
939
940    fn connect_to_backend(
941        &mut self,
942        session_rc: Rc<RefCell<dyn ProxySession>>,
943    ) -> Result<BackendConnectAction, BackendConnectionError> {
944        let cluster_id = self
945            .listener
946            .borrow()
947            .cluster_id
948            .clone()
949            .ok_or(BackendConnectionError::NotFound(ObjectKind::TcpCluster))?;
950
951        self.cluster_id = Some(cluster_id.clone());
952
953        if self.connection_attempt >= CONN_RETRIES {
954            incr!(
955                "backend.connect.retries_exhausted",
956                self.cluster_id.as_deref(),
957                self.metrics.backend_id.as_deref()
958            );
959            warn!(
960                "{} Max connection attempt reached ({})",
961                log_context!(self),
962                self.connection_attempt
963            );
964            return Err(BackendConnectionError::MaxConnectionRetries(Some(
965                cluster_id,
966            )));
967        }
968
969        if self.proxy.borrow().sessions.borrow().at_capacity() {
970            return Err(BackendConnectionError::MaxSessionsMemory);
971        }
972
973        // Per-(cluster, source-IP) connection limit gate (TCP). The
974        // source IP comes from `effective_session_address`, which folds
975        // a parsed PROXY-v2 source over the raw `peer_addr`. The mux's
976        // Router does the same gate for HTTP/HTTPS sessions; here it
977        // runs for raw TCP. Rejection produces a graceful TCP FIN via
978        // `BackendConnectionError::TooManyConnectionsPerIp` →
979        // `handle_connection_result` → `SessionResult::Close` — TCP has
980        // no HTTP envelope to carry a 429 / `Retry-After`.
981        let cluster_max_connections_per_ip = self
982            .proxy
983            .borrow()
984            .configs
985            .get(&cluster_id)
986            .and_then(|c| c.max_connections_per_ip);
987        if let Some(ip) = self.effective_session_address().map(|sa| sa.ip()) {
988            let sessions_rc = self.proxy.borrow().sessions.clone();
989            let at_limit = sessions_rc.borrow().cluster_ip_at_limit(
990                self.frontend_token,
991                &cluster_id,
992                &ip,
993                cluster_max_connections_per_ip,
994            );
995            if at_limit {
996                debug!(
997                    "{} per-(cluster, source-IP) limit hit for cluster {} from {}",
998                    log_context!(self),
999                    cluster_id,
1000                    ip
1001                );
1002                return Err(BackendConnectionError::TooManyConnectionsPerIp { cluster_id });
1003            }
1004            sessions_rc
1005                .borrow_mut()
1006                .track_cluster_ip(self.frontend_token, cluster_id.clone(), ip);
1007            self.cluster_ip_tracked = true;
1008        }
1009
1010        let (backend, mut stream) = self
1011            .proxy
1012            .borrow()
1013            .backends
1014            .borrow_mut()
1015            .backend_from_cluster_id(&cluster_id)
1016            .map_err(BackendConnectionError::Backend)?;
1017
1018        if let Err(e) = stream.set_nodelay(true) {
1019            error!(
1020                "{} Error setting nodelay on back socket({:?}): {:?}",
1021                log_context!(self),
1022                stream,
1023                e
1024            );
1025        }
1026        self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());
1027
1028        let back_token = {
1029            let proxy = self.proxy.borrow();
1030            let mut s = proxy.sessions.borrow_mut();
1031            let entry = s.slab.vacant_entry();
1032            let back_token = Token(entry.key());
1033            let _entry = entry.insert(session_rc.clone());
1034            back_token
1035        };
1036
1037        if let Err(e) = self.proxy.borrow().registry.register(
1038            &mut stream,
1039            back_token,
1040            Interest::READABLE | Interest::WRITABLE,
1041        ) {
1042            error!(
1043                "{} Error registering back socket({:?}): {:?}",
1044                log_context!(self),
1045                stream,
1046                e
1047            );
1048        }
1049
1050        self.container_backend_timeout.set(back_token);
1051
1052        self.set_back_token(back_token);
1053        self.set_back_socket(stream);
1054
1055        self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
1056        self.metrics.backend_start();
1057        self.set_backend_id(backend.borrow().backend_id.clone());
1058
1059        Ok(BackendConnectAction::New)
1060    }
1061}
1062
1063impl ProxySession for TcpSession {
1064    fn close(&mut self) {
1065        if self.has_been_closed {
1066            return;
1067        }
1068
1069        // TODO: the state should handle the timeouts
1070        trace!("{} Closing TCP session", log_context!(self));
1071        self.metrics.service_stop();
1072
1073        // Drain the per-(cluster, source-IP) accounting before any
1074        // early-return path below. The fail / non-fail close branches
1075        // both count, and the SessionManager-side untrack is idempotent
1076        // (no-op when the slot was never tracked) so this is safe even
1077        // when `cluster_ip_tracked` is false.
1078        if self.cluster_ip_tracked {
1079            self.proxy
1080                .borrow()
1081                .sessions
1082                .borrow_mut()
1083                .untrack_all_cluster_ip(self.frontend_token);
1084            self.cluster_ip_tracked = false;
1085        }
1086
1087        // Restore gauges
1088        match self.state.marker() {
1089            StateMarker::Pipe => gauge_add!(names::protocol::TCP, -1),
1090            StateMarker::SendProxyProtocol => gauge_add!(names::protocol::PROXY_SEND, -1),
1091            StateMarker::RelayProxyProtocol => gauge_add!(names::protocol::PROXY_RELAY, -1),
1092            StateMarker::ExpectProxyProtocol => gauge_add!(names::protocol::PROXY_EXPECT, -1),
1093        }
1094
1095        if self.state.failed() {
1096            match self.state.marker() {
1097                StateMarker::Pipe => incr!(names::tcp::UPGRADE_PIPE_FAILED),
1098                StateMarker::SendProxyProtocol => incr!(names::tcp::UPGRADE_SEND_FAILED),
1099                StateMarker::RelayProxyProtocol => incr!(names::tcp::UPGRADE_RELAY_FAILED),
1100                StateMarker::ExpectProxyProtocol => incr!(names::tcp::UPGRADE_EXPECT_FAILED),
1101            }
1102            return;
1103        }
1104
1105        self.cancel_timeouts();
1106
1107        let front_socket = self.state.front_socket();
1108        // TCP listener is plaintext at this layer — `Shutdown::Both` does not
1109        // truncate any TLS write buffer, so the canonical anti-pattern
1110        // (forces a TCP RST on the read direction, dropping in-flight bytes)
1111        // does not apply. Move to `Shutdown::Write` if a TLS upgrade ever
1112        // wraps this listener.
1113        if let Err(e) = front_socket.shutdown(Shutdown::Both) {
1114            // error 107 NotConnected can happen when was never fully connected, or was already disconnected due to error
1115            if e.kind() != ErrorKind::NotConnected {
1116                error!(
1117                    "{} Error shutting down front socket({:?}): {:?}",
1118                    log_context!(self),
1119                    front_socket,
1120                    e
1121                );
1122            }
1123        }
1124
1125        // deregister the frontend and remove it, in a separate scope to drop proxy when done
1126        {
1127            let proxy = self.proxy.borrow();
1128            let fd = front_socket.as_raw_fd();
1129            if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
1130                error!(
1131                    "{} Error deregistering front socket({:?}) while closing TCP session: {:?}",
1132                    log_context!(self),
1133                    fd,
1134                    e
1135                );
1136            }
1137            proxy
1138                .sessions
1139                .borrow_mut()
1140                .slab
1141                .try_remove(self.frontend_token.0);
1142        }
1143
1144        self.close_backend();
1145        self.has_been_closed = true;
1146    }
1147
1148    fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
1149        if self.frontend_token == token {
1150            self.container_frontend_timeout.triggered();
1151            return true;
1152        }
1153        if self.backend_token == Some(token) {
1154            self.container_backend_timeout.triggered();
1155            return true;
1156        }
1157        // invalid token, obsolete timeout triggered
1158        false
1159    }
1160
1161    fn protocol(&self) -> Protocol {
1162        Protocol::TCP
1163    }
1164
1165    fn update_readiness(&mut self, token: Token, events: Ready) {
1166        trace!(
1167            "{} token {:?} got event {}",
1168            log_context!(self),
1169            token,
1170            super::ready_to_string(events)
1171        );
1172
1173        self.last_event = Instant::now();
1174        self.metrics.wait_start();
1175
1176        if self.frontend_token == token {
1177            self.front_readiness().event = self.front_readiness().event | events;
1178        } else if self.backend_token == Some(token) {
1179            if let Some(r) = self.back_readiness() {
1180                r.event |= events;
1181            }
1182        }
1183    }
1184
1185    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1186        self.metrics.service_start();
1187
1188        let session_result = self.ready_inner(session.clone());
1189
1190        let to_bo_closed = match session_result {
1191            SessionResult::Close => true,
1192            SessionResult::Continue => false,
1193            SessionResult::Upgrade => match self.upgrade() {
1194                false => self.ready(session),
1195                true => true,
1196            },
1197        };
1198
1199        self.metrics.service_stop();
1200        to_bo_closed
1201    }
1202
1203    fn shutting_down(&mut self) -> SessionIsToBeClosed {
1204        true
1205    }
1206
1207    fn last_event(&self) -> Instant {
1208        self.last_event
1209    }
1210
1211    fn print_session(&self) {
1212        let state: String = match &self.state {
1213            TcpStateMachine::ExpectProxyProtocol(_) => String::from("Expect"),
1214            TcpStateMachine::SendProxyProtocol(_) => String::from("Send"),
1215            TcpStateMachine::RelayProxyProtocol(_) => String::from("Relay"),
1216            TcpStateMachine::Pipe(_) => String::from("TCP"),
1217            TcpStateMachine::FailedUpgrade(marker) => format!("FailedUpgrade({marker:?})"),
1218        };
1219
1220        let front_readiness = match &self.state {
1221            TcpStateMachine::ExpectProxyProtocol(expect) => Some(&expect.frontend_readiness),
1222            TcpStateMachine::SendProxyProtocol(send) => Some(&send.frontend_readiness),
1223            TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.frontend_readiness),
1224            TcpStateMachine::Pipe(pipe) => Some(&pipe.frontend_readiness),
1225            TcpStateMachine::FailedUpgrade(_) => None,
1226        };
1227
1228        let back_readiness = match &self.state {
1229            TcpStateMachine::SendProxyProtocol(send) => Some(&send.backend_readiness),
1230            TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.backend_readiness),
1231            TcpStateMachine::Pipe(pipe) => Some(&pipe.backend_readiness),
1232            TcpStateMachine::ExpectProxyProtocol(_) => None,
1233            TcpStateMachine::FailedUpgrade(_) => None,
1234        };
1235
1236        error!(
1237            "\
1238{} Session ({:?})
1239\tFrontend:
1240\t\ttoken: {:?}\treadiness: {:?}
1241\tBackend:
1242\t\ttoken: {:?}\treadiness: {:?}\tstatus: {:?}\tcluster id: {:?}",
1243            log_context!(self),
1244            state,
1245            self.frontend_token,
1246            front_readiness,
1247            self.backend_token,
1248            back_readiness,
1249            self.backend_connected,
1250            self.cluster_id
1251        );
1252        error!("Metrics: {:?}", self.metrics);
1253    }
1254
1255    fn frontend_token(&self) -> Token {
1256        self.frontend_token
1257    }
1258}
1259
1260pub struct TcpListener {
1261    active: SessionIsToBeClosed,
1262    address: SocketAddr,
1263    cluster_id: Option<String>,
1264    config: TcpListenerConfig,
1265    listener: Option<MioTcpListener>,
1266    tags: BTreeMap<String, CachedTags>,
1267    token: Token,
1268}
1269
1270impl ListenerHandler for TcpListener {
1271    fn get_addr(&self) -> &SocketAddr {
1272        &self.address
1273    }
1274
1275    fn get_tags(&self, key: &str) -> Option<&CachedTags> {
1276        self.tags.get(key)
1277    }
1278
1279    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
1280        match tags {
1281            Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
1282            None => self.tags.remove(&key),
1283        };
1284    }
1285
1286    fn protocol(&self) -> Protocol {
1287        Protocol::TCP
1288    }
1289
1290    fn public_address(&self) -> SocketAddr {
1291        self.config
1292            .public_address
1293            .map(|addr| addr.into())
1294            .unwrap_or(self.address)
1295    }
1296}
1297
1298impl TcpListener {
1299    fn new(config: TcpListenerConfig, token: Token) -> Result<TcpListener, ListenerError> {
1300        Ok(TcpListener {
1301            cluster_id: None,
1302            listener: None,
1303            token,
1304            address: config.address.into(),
1305            config,
1306            active: false,
1307            tags: BTreeMap::new(),
1308        })
1309    }
1310
1311    pub fn activate(
1312        &mut self,
1313        registry: &Registry,
1314        tcp_listener: Option<MioTcpListener>,
1315    ) -> Result<Token, ProxyError> {
1316        if self.active {
1317            return Ok(self.token);
1318        }
1319
1320        let mut listener = match tcp_listener {
1321            Some(listener) => listener,
1322            None => {
1323                let address = self.config.address.into();
1324                server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
1325            }
1326        };
1327
1328        registry
1329            .register(&mut listener, self.token, Interest::READABLE)
1330            .map_err(ProxyError::RegisterListener)?;
1331
1332        self.listener = Some(listener);
1333        self.active = true;
1334        Ok(self.token)
1335    }
1336
1337    /// Apply a partial-update patch to this TCP listener's live configuration.
1338    ///
1339    /// Fields absent in the patch (i.e. `None`) are preserved unchanged.
1340    pub fn update_config(&mut self, patch: &UpdateTcpListenerConfig) -> Result<(), ListenerError> {
1341        if let Some(v) = patch.public_address {
1342            self.config.public_address = Some(v);
1343        }
1344        if let Some(v) = patch.expect_proxy {
1345            self.config.expect_proxy = v;
1346        }
1347        if let Some(v) = patch.front_timeout {
1348            self.config.front_timeout = v;
1349        }
1350        if let Some(v) = patch.back_timeout {
1351            self.config.back_timeout = v;
1352        }
1353        if let Some(v) = patch.connect_timeout {
1354            self.config.connect_timeout = v;
1355        }
1356        Ok(())
1357    }
1358}
1359
1360fn handle_connection_result(
1361    connection_result: Result<BackendConnectAction, BackendConnectionError>,
1362) -> Option<SessionResult> {
1363    match connection_result {
1364        // reuse connection or send a default answer, we can continue
1365        Ok(BackendConnectAction::Reuse) => None,
1366        Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
1367            // we must wait for an event
1368            Some(SessionResult::Continue)
1369        }
1370        Err(_) => {
1371            // in case of BackendConnectionError::Backend(BackendError::ConnectionFailures(..))
1372            // we may want to retry instead of closing
1373            Some(SessionResult::Close)
1374        }
1375    }
1376}
1377
1378#[derive(Debug)]
1379pub struct ClusterConfiguration {
1380    proxy_protocol: Option<ProxyProtocolConfig>,
1381    // Uncomment this when implementing new load balancing algorithms
1382    // load_balancing: LoadBalancingAlgorithms,
1383    /// Per-cluster override of the global per-(cluster, source-IP)
1384    /// connection limit. `None` inherits the global default,
1385    /// `Some(0)` is explicit "unlimited", `Some(n > 0)` overrides.
1386    /// Resolved against `SessionManager::effective_max_connections_per_ip`
1387    /// at admit time in `connect_to_backend`.
1388    pub max_connections_per_ip: Option<u64>,
1389}
1390
1391pub struct TcpProxy {
1392    fronts: HashMap<String, Token>,
1393    backends: Rc<RefCell<BackendMap>>,
1394    listeners: HashMap<Token, Rc<RefCell<TcpListener>>>,
1395    configs: HashMap<ClusterId, ClusterConfiguration>,
1396    registry: Registry,
1397    sessions: Rc<RefCell<SessionManager>>,
1398    pool: Rc<RefCell<Pool>>,
1399}
1400
1401impl TcpProxy {
1402    pub fn new(
1403        registry: Registry,
1404        sessions: Rc<RefCell<SessionManager>>,
1405        pool: Rc<RefCell<Pool>>,
1406        backends: Rc<RefCell<BackendMap>>,
1407    ) -> TcpProxy {
1408        TcpProxy {
1409            backends,
1410            listeners: HashMap::new(),
1411            configs: HashMap::new(),
1412            fronts: HashMap::new(),
1413            registry,
1414            sessions,
1415            pool,
1416        }
1417    }
1418
1419    pub fn add_listener(
1420        &mut self,
1421        config: TcpListenerConfig,
1422        token: Token,
1423    ) -> Result<Token, ProxyError> {
1424        match self.listeners.entry(token) {
1425            Entry::Vacant(entry) => {
1426                let tcp_listener =
1427                    TcpListener::new(config, token).map_err(ProxyError::AddListener)?;
1428                entry.insert(Rc::new(RefCell::new(tcp_listener)));
1429                Ok(token)
1430            }
1431            _ => Err(ProxyError::ListenerAlreadyPresent),
1432        }
1433    }
1434
1435    pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
1436        let len = self.listeners.len();
1437
1438        self.listeners.retain(|_, l| l.borrow().address != address);
1439        self.listeners.len() < len
1440    }
1441
1442    pub fn activate_listener(
1443        &self,
1444        addr: &SocketAddr,
1445        tcp_listener: Option<MioTcpListener>,
1446    ) -> Result<Token, ProxyError> {
1447        let listener = self
1448            .listeners
1449            .values()
1450            .find(|listener| listener.borrow().address == *addr)
1451            .ok_or(ProxyError::NoListenerFound(*addr))?;
1452
1453        listener.borrow_mut().activate(&self.registry, tcp_listener)
1454    }
1455
1456    pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1457        self.listeners
1458            .values()
1459            .filter_map(|listener| {
1460                let mut owned = listener.borrow_mut();
1461                if let Some(listener) = owned.listener.take() {
1462                    // Reset `active` so a subsequent `activate()` re-binds
1463                    // instead of short-circuiting on the stale flag.
1464                    owned.active = false;
1465                    return Some((owned.address, listener));
1466                }
1467
1468                None
1469            })
1470            .collect()
1471    }
1472
1473    pub fn give_back_listener(
1474        &mut self,
1475        address: SocketAddr,
1476    ) -> Result<(Token, MioTcpListener), ProxyError> {
1477        let listener = self
1478            .listeners
1479            .values()
1480            .find(|listener| listener.borrow().address == address)
1481            .ok_or(ProxyError::NoListenerFound(address))?;
1482
1483        let mut owned = listener.borrow_mut();
1484
1485        let taken_listener = owned
1486            .listener
1487            .take()
1488            .ok_or(ProxyError::UnactivatedListener)?;
1489
1490        // Reset `active` so a subsequent `activate()` re-binds instead of
1491        // short-circuiting on the stale flag.
1492        owned.active = false;
1493
1494        Ok((owned.token, taken_listener))
1495    }
1496
1497    /// Apply a partial-update patch to the identified TCP listener.
1498    pub fn update_listener(&mut self, patch: UpdateTcpListenerConfig) -> Result<(), ProxyError> {
1499        let address: SocketAddr = patch.address.into();
1500        let listener = self
1501            .listeners
1502            .values()
1503            .find(|l| l.borrow().address == address)
1504            .ok_or(ProxyError::NoListenerFound(address))?;
1505        listener
1506            .borrow_mut()
1507            .update_config(&patch)
1508            .map_err(|listener_error| ProxyError::ListenerActivation {
1509                address,
1510                listener_error,
1511            })
1512    }
1513
1514    pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1515        let address = front.address.into();
1516
1517        let mut listener = self
1518            .listeners
1519            .values()
1520            .find(|l| l.borrow().address == address)
1521            .ok_or(ProxyError::NoListenerFound(address))?
1522            .borrow_mut();
1523
1524        self.fronts
1525            .insert(front.cluster_id.to_string(), listener.token);
1526        listener.set_tags(address.to_string(), Some(front.tags));
1527        listener.cluster_id = Some(front.cluster_id);
1528        Ok(())
1529    }
1530
1531    pub fn remove_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1532        let address = front.address.into();
1533
1534        let mut listener = match self
1535            .listeners
1536            .values()
1537            .find(|l| l.borrow().address == address)
1538        {
1539            Some(l) => l.borrow_mut(),
1540            None => return Err(ProxyError::NoListenerFound(address)),
1541        };
1542
1543        listener.set_tags(address.to_string(), None);
1544        if let Some(cluster_id) = listener.cluster_id.take() {
1545            self.fronts.remove(&cluster_id);
1546        }
1547        Ok(())
1548    }
1549}
1550
1551impl ProxyConfiguration for TcpProxy {
1552    fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
1553        let request_type = match message.content.request_type {
1554            Some(t) => t,
1555            None => return WorkerResponse::error(message.id, "Empty request"),
1556        };
1557        match request_type {
1558            RequestType::AddTcpFrontend(front) => {
1559                if let Err(err) = self.add_tcp_front(front) {
1560                    return WorkerResponse::error(message.id, err);
1561                }
1562
1563                WorkerResponse::ok(message.id)
1564            }
1565            RequestType::RemoveTcpFrontend(front) => {
1566                if let Err(err) = self.remove_tcp_front(front) {
1567                    return WorkerResponse::error(message.id, err);
1568                }
1569
1570                WorkerResponse::ok(message.id)
1571            }
1572            RequestType::SoftStop(_) => {
1573                info!(
1574                    "{} {} processing soft shutdown",
1575                    log_module_context!(),
1576                    message.id
1577                );
1578                let listeners: HashMap<_, _> = self.listeners.drain().collect();
1579                for (_, l) in listeners.iter() {
1580                    l.borrow_mut()
1581                        .listener
1582                        .take()
1583                        .map(|mut sock| self.registry.deregister(&mut sock));
1584                }
1585                WorkerResponse::processing(message.id)
1586            }
1587            RequestType::HardStop(_) => {
1588                info!("{} {} hard shutdown", log_module_context!(), message.id);
1589                let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1590                for (_, l) in listeners.drain() {
1591                    l.borrow_mut()
1592                        .listener
1593                        .take()
1594                        .map(|mut sock| self.registry.deregister(&mut sock));
1595                }
1596                WorkerResponse::ok(message.id)
1597            }
1598            RequestType::Status(_) => {
1599                info!("{} {} status", log_module_context!(), message.id);
1600                WorkerResponse::ok(message.id)
1601            }
1602            RequestType::AddCluster(cluster) => {
1603                let config = ClusterConfiguration {
1604                    proxy_protocol: cluster
1605                        .proxy_protocol
1606                        .and_then(|n| ProxyProtocolConfig::try_from(n).ok()),
1607                    //load_balancing: cluster.load_balancing,
1608                    max_connections_per_ip: cluster.max_connections_per_ip,
1609                };
1610                self.configs.insert(cluster.cluster_id, config);
1611                WorkerResponse::ok(message.id)
1612            }
1613            RequestType::RemoveCluster(cluster_id) => {
1614                self.configs.remove(&cluster_id);
1615                WorkerResponse::ok(message.id)
1616            }
1617            RequestType::RemoveListener(remove) => {
1618                if !self.remove_listener(remove.address.into()) {
1619                    WorkerResponse::error(
1620                        message.id,
1621                        format!("no TCP listener to remove at address {:?}", remove.address),
1622                    )
1623                } else {
1624                    WorkerResponse::ok(message.id)
1625                }
1626            }
1627            command => {
1628                debug!(
1629                    "{} {} unsupported message for TCP proxy, ignoring {:?}",
1630                    log_module_context!(),
1631                    message.id,
1632                    command
1633                );
1634                WorkerResponse::error(message.id, "unsupported message")
1635            }
1636        }
1637    }
1638
1639    fn accept(&mut self, token: ListenToken) -> Result<MioTcpStream, AcceptError> {
1640        let internal_token = Token(token.0);
1641        if let Some(listener) = self.listeners.get(&internal_token) {
1642            if let Some(tcp_listener) = &listener.borrow().listener {
1643                tcp_listener
1644                    .accept()
1645                    .map(|(frontend_sock, _)| frontend_sock)
1646                    .map_err(|e| match e.kind() {
1647                        ErrorKind::WouldBlock => AcceptError::WouldBlock,
1648                        _ => {
1649                            error!("{} accept() IO error: {:?}", log_module_context!(), e);
1650                            AcceptError::IoError
1651                        }
1652                    })
1653            } else {
1654                Err(AcceptError::IoError)
1655            }
1656        } else {
1657            Err(AcceptError::IoError)
1658        }
1659    }
1660
1661    fn create_session(
1662        &mut self,
1663        mut frontend_sock: MioTcpStream,
1664        token: ListenToken,
1665        wait_time: Duration,
1666        proxy: Rc<RefCell<Self>>,
1667    ) -> Result<(), AcceptError> {
1668        let listener_token = Token(token.0);
1669
1670        let listener = self
1671            .listeners
1672            .get(&listener_token)
1673            .ok_or(AcceptError::IoError)?;
1674
1675        let owned = listener.borrow();
1676        let mut pool = self.pool.borrow_mut();
1677
1678        let (front_buffer, back_buffer) = match (pool.checkout(), pool.checkout()) {
1679            (Some(fb), Some(bb)) => (fb, bb),
1680            _ => {
1681                error!("{} could not get buffers from pool", log_module_context!());
1682                error!(
1683                    "{} Buffer capacity has been reached, stopping to accept new connections for now",
1684                    log_module_context!()
1685                );
1686                gauge!(names::accept_queue::BACKPRESSURE, 1);
1687                self.sessions.borrow_mut().can_accept = false;
1688
1689                return Err(AcceptError::BufferCapacityReached);
1690            }
1691        };
1692
1693        if owned.cluster_id.is_none() {
1694            error!(
1695                "{} listener at address {:?} has no linked cluster",
1696                log_module_context!(),
1697                owned.address
1698            );
1699            return Err(AcceptError::IoError);
1700        }
1701
1702        let proxy_protocol = self
1703            .configs
1704            .get(owned.cluster_id.as_ref().unwrap())
1705            .and_then(|c| c.proxy_protocol);
1706
1707        if let Err(e) = frontend_sock.set_nodelay(true) {
1708            error!(
1709                "{} error setting nodelay on front socket({:?}): {:?}",
1710                log_module_context!(),
1711                frontend_sock,
1712                e
1713            );
1714        }
1715
1716        let mut session_manager = self.sessions.borrow_mut();
1717        let entry = session_manager.slab.vacant_entry();
1718        let frontend_token = Token(entry.key());
1719
1720        if let Err(register_error) = self.registry.register(
1721            &mut frontend_sock,
1722            frontend_token,
1723            Interest::READABLE | Interest::WRITABLE,
1724        ) {
1725            error!(
1726                "{} error registering front socket({:?}): {:?}",
1727                log_module_context!(),
1728                frontend_sock,
1729                register_error
1730            );
1731            return Err(AcceptError::RegisterError);
1732        }
1733
1734        let session = TcpSession::new(
1735            back_buffer,
1736            None,
1737            owned.cluster_id.clone(),
1738            Duration::from_secs(owned.config.back_timeout as u64),
1739            Duration::from_secs(owned.config.connect_timeout as u64),
1740            Duration::from_secs(owned.config.front_timeout as u64),
1741            front_buffer,
1742            frontend_token,
1743            listener.clone(),
1744            proxy_protocol,
1745            proxy,
1746            frontend_sock,
1747            wait_time,
1748        );
1749        incr!(names::tcp::REQUESTS);
1750
1751        let session = Rc::new(RefCell::new(session));
1752        entry.insert(session);
1753
1754        Ok(())
1755    }
1756}
1757
1758pub mod testing {
1759    use crate::testing::*;
1760
1761    /// This is not directly used by Sōzu but is available for example and testing purposes
1762    pub fn start_tcp_worker(
1763        config: TcpListenerConfig,
1764        max_buffers: usize,
1765        buffer_size: usize,
1766        channel: ProxyChannel,
1767    ) -> anyhow::Result<()> {
1768        let address = config.address.into();
1769
1770        let ServerParts {
1771            event_loop,
1772            registry,
1773            sessions,
1774            pool,
1775            backends,
1776            client_scm_socket: _,
1777            server_scm_socket,
1778            server_config,
1779        } = prebuild_server(max_buffers, buffer_size, true)?;
1780
1781        let token = {
1782            let mut sessions = sessions.borrow_mut();
1783            let entry = sessions.slab.vacant_entry();
1784            let key = entry.key();
1785            let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1786                protocol: Protocol::TCPListen,
1787            })));
1788            Token(key)
1789        };
1790
1791        let mut proxy = TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1792        proxy
1793            .add_listener(config, token)
1794            .with_context(|| "Failed at creating adding the listener")?;
1795        proxy
1796            .activate_listener(&address, None)
1797            .with_context(|| "Failed at creating activating the listener")?;
1798
1799        let mut server = Server::new(
1800            event_loop,
1801            channel,
1802            server_scm_socket,
1803            sessions,
1804            pool,
1805            backends,
1806            None,
1807            None,
1808            Some(proxy),
1809            server_config,
1810            None,
1811            false,
1812        )
1813        .with_context(|| "Failed at creating server")?;
1814
1815        debug!("{} starting event loop", log_module_context!());
1816        server.run();
1817        debug!("{} ending event loop", log_module_context!());
1818        Ok(())
1819    }
1820}
1821
1822#[cfg(test)]
1823mod tests {
1824    use std::{
1825        io::{Read, Write},
1826        net::{Shutdown, TcpListener, TcpStream},
1827        str,
1828        sync::{
1829            Arc, Barrier,
1830            atomic::{AtomicBool, Ordering},
1831        },
1832        thread,
1833        time::Duration,
1834    };
1835
1836    use sozu_command::{
1837        channel::Channel,
1838        config::ListenerBuilder,
1839        proto::command::{
1840            LoadBalancingParams, RequestTcpFrontend, SocketAddress, SoftStop, WorkerRequest,
1841            WorkerResponse, request::RequestType,
1842        },
1843    };
1844
1845    use super::testing::start_tcp_worker;
1846    use crate::testing::*;
1847
1848    /*
1849    #[test]
1850    #[cfg(target_pointer_width = "64")]
1851    fn size_test() {
1852      assert_size!(Pipe<mio::net::TcpStream>, 224);
1853      assert_size!(SendProxyProtocol<mio::net::TcpStream>, 144);
1854      assert_size!(RelayProxyProtocol<mio::net::TcpStream>, 152);
1855      assert_size!(ExpectProxyProtocol<mio::net::TcpStream>, 520);
1856      assert_size!(State, 528);
1857      // fails depending on the platform?
1858      //assert_size!(Session, 808);
1859    }*/
1860
1861    #[test]
1862    fn round_trip() {
1863        setup_test_logger!();
1864        let barrier = Arc::new(Barrier::new(2));
1865        let test_finished = Arc::new(AtomicBool::new(false));
1866
1867        let front_port1 = provide_port();
1868        let front_port2 = provide_port();
1869
1870        let backend_port = start_server(barrier.clone(), test_finished.clone());
1871        let mut command =
1872            start_proxy(backend_port, front_port1, front_port2).expect("Could not start proxy");
1873        barrier.wait();
1874
1875        thread::scope(|_s| {
1876            let front_addr = format!("127.0.0.1:{front_port1}");
1877
1878            let mut s1 = TcpStream::connect(&front_addr).expect("could not connect");
1879            s1.set_read_timeout(Some(Duration::from_secs(5)))
1880                .expect("could not set read timeout on s1");
1881
1882            let s3 = TcpStream::connect(&front_addr).expect("could not connect");
1883
1884            let mut s2 = TcpStream::connect(&front_addr).expect("could not connect");
1885            s2.set_read_timeout(Some(Duration::from_secs(5)))
1886                .expect("could not set read timeout on s2");
1887
1888            s1.write_all(b"hello ").expect("could not write to s1");
1889            println!("s1 sent");
1890
1891            s2.write_all(b"pouet pouet").expect("could not write to s2");
1892            println!("s2 sent");
1893
1894            let mut res = [0; 128];
1895            s1.write_all(b"coucou").expect("could not write to s1");
1896
1897            s3.shutdown(Shutdown::Both).expect("could not shutdown s3");
1898
1899            let sz2 = s2
1900                .read(&mut res[..])
1901                .expect("could not read from socket s2");
1902            println!("s2 received {:?}", str::from_utf8(&res[..sz2]));
1903            assert_eq!(&res[..sz2], &b"pouet pouet"[..]);
1904
1905            // Read in a loop: a single read() on a TCP stream is not
1906            // guaranteed to return all echoed data if the second write's
1907            // round trip (client → proxy → backend → proxy → client) is
1908            // still in flight when we poll.
1909            let expected = b"hello coucou";
1910            let mut total = 0;
1911            while total < expected.len() {
1912                let sz = s1
1913                    .read(&mut res[total..])
1914                    .expect("could not read from socket s1");
1915                assert!(sz > 0, "connection closed before receiving all data");
1916                total += sz;
1917            }
1918            println!(
1919                "s1 received again({}): {:?}",
1920                total,
1921                str::from_utf8(&res[..total])
1922            );
1923            assert_eq!(&res[..total], &expected[..]);
1924
1925            // Signal the echo server to stop
1926            test_finished.store(true, Ordering::Relaxed);
1927
1928            // Send SoftStop to the sozu worker so server.run() exits cleanly
1929            command
1930                .write_message(&WorkerRequest {
1931                    id: "ID_SOFTSTOP".to_owned(),
1932                    content: RequestType::SoftStop(SoftStop {}).into(),
1933                })
1934                .expect("could not send SoftStop to sozu worker");
1935        });
1936    }
1937
1938    /// Start an echo server on an ephemeral port.
1939    /// Returns the port the server is listening on.
1940    fn start_server(barrier: Arc<Barrier>, test_finished: Arc<AtomicBool>) -> u16 {
1941        let listener =
1942            TcpListener::bind("127.0.0.1:0").expect("could not bind echo server listener");
1943        let port = listener
1944            .local_addr()
1945            .expect("could not get echo server local address")
1946            .port();
1947
1948        listener
1949            .set_nonblocking(true)
1950            .expect("could not set echo server listener to non-blocking");
1951
1952        thread::spawn(move || {
1953            barrier.wait();
1954            let mut count: u8 = 0;
1955            loop {
1956                match listener.accept() {
1957                    Ok((mut stream, _)) => {
1958                        let finished = test_finished.clone();
1959                        thread::spawn(move || {
1960                            println!("got a new client: {count}");
1961                            stream
1962                                .set_read_timeout(Some(Duration::from_secs(2)))
1963                                .expect("could not set read timeout on echo client");
1964                            let mut buf = [0; 128];
1965                            loop {
1966                                match stream.read(&mut buf[..]) {
1967                                    Ok(0) => break,
1968                                    Ok(sz) => {
1969                                        println!(
1970                                            "ECHO[{count}] got \"{:?}\"",
1971                                            str::from_utf8(&buf[..sz])
1972                                        );
1973                                        stream
1974                                            .write_all(&buf[..sz])
1975                                            .expect("could not echo data back");
1976                                    }
1977                                    Err(ref e)
1978                                        if e.kind() == std::io::ErrorKind::WouldBlock
1979                                            || e.kind() == std::io::ErrorKind::TimedOut =>
1980                                    {
1981                                        if finished.load(Ordering::Relaxed) {
1982                                            println!("backend server stopping (client handler)");
1983                                            break;
1984                                        }
1985                                    }
1986                                    Err(_) => break,
1987                                }
1988                            }
1989                        });
1990                        count = count.wrapping_add(1);
1991                    }
1992                    Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
1993                        if test_finished.load(Ordering::Relaxed) {
1994                            println!("backend server stopping (accept loop)");
1995                            break;
1996                        }
1997                        thread::sleep(Duration::from_millis(50));
1998                    }
1999                    Err(e) => {
2000                        println!("connection failed: {e:?}");
2001                    }
2002                }
2003            }
2004        });
2005
2006        port
2007    }
2008
2009    /// Start a sozu TCP proxy worker with the given backend and frontend ports.
2010    fn start_proxy(
2011        backend_port: u16,
2012        front_port1: u16,
2013        front_port2: u16,
2014    ) -> anyhow::Result<Channel<WorkerRequest, WorkerResponse>> {
2015        let config = ListenerBuilder::new_tcp(SocketAddress::new_v4(127, 0, 0, 1, front_port1))
2016            .to_tcp(None)
2017            .expect("could not create listener config");
2018
2019        let (mut command, channel) =
2020            Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
2021        let _jg = thread::spawn(move || {
2022            setup_test_logger!();
2023            start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server");
2024        });
2025
2026        command
2027            .blocking()
2028            .expect("could not set command channel to blocking");
2029        {
2030            let front = RequestTcpFrontend {
2031                cluster_id: "yolo".to_owned(),
2032                address: SocketAddress::new_v4(127, 0, 0, 1, front_port1),
2033                ..Default::default()
2034            };
2035            let backend = sozu_command_lib::response::Backend {
2036                cluster_id: "yolo".to_owned(),
2037                backend_id: "yolo-0".to_owned(),
2038                address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2039                load_balancing_parameters: Some(LoadBalancingParams::default()),
2040                sticky_id: None,
2041                backup: None,
2042            };
2043
2044            command
2045                .write_message(&WorkerRequest {
2046                    id: "ID_YOLO1".to_owned(),
2047                    content: RequestType::AddTcpFrontend(front).into(),
2048                })
2049                .expect("could not send AddTcpFrontend for front1");
2050            command
2051                .write_message(&WorkerRequest {
2052                    id: "ID_YOLO2".to_owned(),
2053                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
2054                })
2055                .expect("could not send AddBackend for front1");
2056        }
2057        {
2058            let front = RequestTcpFrontend {
2059                cluster_id: "yolo".to_owned(),
2060                address: SocketAddress::new_v4(127, 0, 0, 1, front_port2),
2061                ..Default::default()
2062            };
2063            let backend = sozu_command::response::Backend {
2064                cluster_id: "yolo".to_owned(),
2065                backend_id: "yolo-0".to_owned(),
2066                address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2067                load_balancing_parameters: Some(LoadBalancingParams::default()),
2068                sticky_id: None,
2069                backup: None,
2070            };
2071            command
2072                .write_message(&WorkerRequest {
2073                    id: "ID_YOLO3".to_owned(),
2074                    content: RequestType::AddTcpFrontend(front).into(),
2075                })
2076                .expect("could not send AddTcpFrontend for front2");
2077            command
2078                .write_message(&WorkerRequest {
2079                    id: "ID_YOLO4".to_owned(),
2080                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
2081                })
2082                .expect("could not send AddBackend for front2");
2083        }
2084
2085        for _ in 0..4 {
2086            println!(
2087                "read_message: {:?}",
2088                command
2089                    .read_message()
2090                    .with_context(|| "could not read message")?
2091            );
2092        }
2093
2094        Ok(command)
2095    }
2096}