Skip to main content

sozu_lib/
tcp.rs

1use std::{
2    cell::RefCell,
3    collections::{BTreeMap, HashMap, hash_map::Entry},
4    io::ErrorKind,
5    net::{Shutdown, SocketAddr},
6    os::unix::io::AsRawFd,
7    rc::Rc,
8    time::{Duration, Instant},
9};
10
11use mio::{
12    Interest, Registry, Token,
13    net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream},
14    unix::SourceFd,
15};
16use rusty_ulid::Ulid;
17use sozu_command::{
18    ObjectKind,
19    config::MAX_LOOP_ITERATIONS,
20    logging::{EndpointRecord, LogContext, ansi_palette},
21    proto::command::request::RequestType,
22};
23
24use crate::metrics::names;
25use crate::{
26    AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
27    ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
28    Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
29    backends::{Backend, BackendMap},
30    pool::{Checkout, Pool},
31    protocol::{
32        Pipe,
33        pipe::WebSocketContext,
34        proxy_protocol::{
35            expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
36        },
37    },
38    retry::RetryPolicy,
39    server::{CONN_RETRIES, ListenToken, SessionManager, push_event},
40    socket::{server_bind, stats::socket_rtt},
41    sozu_command::{
42        proto::command::{
43            Event, EventKind, ProxyProtocolConfig, RequestTcpFrontend, TcpListenerConfig,
44            UpdateTcpListenerConfig, WorkerRequest, WorkerResponse,
45        },
46        ready::Ready,
47        state::ClusterId,
48    },
49    timer::TimeoutContainer,
50};
51
52StateMachineBuilder! {
53    /// The various Stages of a TCP connection:
54    ///
55    /// 1. optional (ExpectProxyProtocol | SendProxyProtocol | RelayProxyProtocol)
56    /// 2. Pipe
57    enum TcpStateMachine {
58        Pipe(Pipe<MioTcpStream, TcpListener>),
59        SendProxyProtocol(SendProxyProtocol<MioTcpStream>),
60        RelayProxyProtocol(RelayProxyProtocol<MioTcpStream>),
61        ExpectProxyProtocol(ExpectProxyProtocol<MioTcpStream>),
62    }
63}
64
65/// This macro is defined uniquely in this module to help the tracking of kawa h1
66/// issues inside Sōzu. Colored output uses the unified log-context scheme:
67/// bold bright-white protocol label, light-grey `Session` keyword, gray keys
68/// and bright-white values.
69macro_rules! log_context {
70    ($self:expr) => {{
71        let (open, reset, grey, gray, white) = ansi_palette();
72        format!(
73            "{gray}{ctx}{reset}\t{open}TCP{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset})\t >>>",
74            open = open,
75            reset = reset,
76            grey = grey,
77            gray = gray,
78            white = white,
79            ctx = $self.log_context(),
80            frontend = $self.frontend_token.0,
81            backend = $self
82                .backend_token
83                .map(|token| token.0.to_string())
84                .unwrap_or_else(|| "<none>".to_string()),
85        )
86    }};
87}
88
89/// Module-level prefix for log lines emitted from this file when no
90/// [`TcpSession`] is in scope. Produces a bold bright-white `TCP` label
91/// (uniform with the per-session `log_context!`) when the logger is in
92/// colored mode. Used by [`TcpProxy`] callbacks (notify, accept,
93/// create_session, soft_stop, hard_stop, status) and the `testing`
94/// helper module which own a listener/token map but have no
95/// `frontend_token` of their own.
96macro_rules! log_module_context {
97    () => {{
98        let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
99        format!("{open}TCP{reset}\t >>>", open = open, reset = reset)
100    }};
101}
102
103pub struct TcpSession {
104    backend_buffer: Option<Checkout>,
105    backend_connected: BackendConnectionStatus,
106    backend_id: Option<String>,
107    backend_token: Option<Token>,
108    backend: Option<Rc<RefCell<Backend>>>,
109    cluster_id: Option<String>,
110    configured_backend_timeout: Duration,
111    connection_attempt: u8,
112    container_backend_timeout: TimeoutContainer,
113    container_frontend_timeout: TimeoutContainer,
114    frontend_address: Option<SocketAddr>,
115    frontend_buffer: Option<Checkout>,
116    frontend_token: Token,
117    has_been_closed: SessionIsToBeClosed,
118    last_event: Instant,
119    listener: Rc<RefCell<TcpListener>>,
120    metrics: SessionMetrics,
121    proxy: Rc<RefCell<TcpProxy>>,
122    request_id: Ulid,
123    state: TcpStateMachine,
124    /// `true` once `connect_to_backend` has accounted this session
125    /// against the per-(cluster, source-IP) connection counter. Drives
126    /// the symmetric `untrack_all_cluster_ip` call in `close`. The flag
127    /// is per-session, not per-attempt: a TCP session has at most one
128    /// `(cluster, ip)` slot, so the SessionManager-side idempotency
129    /// already covers retries — this flag exists only to short-circuit
130    /// the close path's untrack when the feature is disabled or no
131    /// admit ever ran.
132    cluster_ip_tracked: bool,
133}
134
135impl TcpSession {
136    #[allow(clippy::too_many_arguments)]
137    fn new(
138        backend_buffer: Checkout,
139        backend_id: Option<String>,
140        cluster_id: Option<String>,
141        configured_backend_timeout: Duration,
142        configured_connect_timeout: Duration,
143        configured_frontend_timeout: Duration,
144        frontend_buffer: Checkout,
145        frontend_token: Token,
146        listener: Rc<RefCell<TcpListener>>,
147        proxy_protocol: Option<ProxyProtocolConfig>,
148        proxy: Rc<RefCell<TcpProxy>>,
149        socket: MioTcpStream,
150        wait_time: Duration,
151    ) -> TcpSession {
152        let frontend_address = socket.peer_addr().ok();
153        let mut frontend_buffer_session = None;
154        let mut backend_buffer_session = None;
155
156        let request_id = Ulid::generate();
157
158        let container_frontend_timeout =
159            TimeoutContainer::new(configured_frontend_timeout, frontend_token);
160        let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout);
161
162        let state = match proxy_protocol {
163            Some(ProxyProtocolConfig::RelayHeader) => {
164                backend_buffer_session = Some(backend_buffer);
165                gauge_add!(names::protocol::PROXY_RELAY, 1);
166                TcpStateMachine::RelayProxyProtocol(RelayProxyProtocol::new(
167                    socket,
168                    frontend_token,
169                    request_id,
170                    None,
171                    frontend_buffer,
172                ))
173            }
174            Some(ProxyProtocolConfig::ExpectHeader) => {
175                frontend_buffer_session = Some(frontend_buffer);
176                backend_buffer_session = Some(backend_buffer);
177                gauge_add!(names::protocol::PROXY_EXPECT, 1);
178                TcpStateMachine::ExpectProxyProtocol(ExpectProxyProtocol::new(
179                    container_frontend_timeout.clone(),
180                    socket,
181                    frontend_token,
182                    request_id,
183                ))
184            }
185            Some(ProxyProtocolConfig::SendHeader) => {
186                frontend_buffer_session = Some(frontend_buffer);
187                backend_buffer_session = Some(backend_buffer);
188                gauge_add!(names::protocol::PROXY_SEND, 1);
189                TcpStateMachine::SendProxyProtocol(SendProxyProtocol::new(
190                    socket,
191                    frontend_token,
192                    request_id,
193                    None,
194                ))
195            }
196            None => {
197                gauge_add!(names::protocol::TCP, 1);
198                let mut pipe = Pipe::new(
199                    backend_buffer,
200                    backend_id.clone(),
201                    None,
202                    None,
203                    None,
204                    None,
205                    cluster_id.clone(),
206                    frontend_buffer,
207                    frontend_token,
208                    socket,
209                    listener.clone(),
210                    Protocol::TCP,
211                    request_id,
212                    request_id,
213                    frontend_address,
214                    WebSocketContext::Tcp,
215                );
216                pipe.set_cluster_id(cluster_id.clone());
217                TcpStateMachine::Pipe(pipe)
218            }
219        };
220
221        let metrics = SessionMetrics::new(Some(wait_time));
222        //FIXME: timeout usage
223
224        TcpSession {
225            backend_buffer: backend_buffer_session,
226            backend_connected: BackendConnectionStatus::NotConnected,
227            backend_id,
228            backend_token: None,
229            backend: None,
230            cluster_id,
231            configured_backend_timeout,
232            connection_attempt: 0,
233            container_backend_timeout,
234            container_frontend_timeout,
235            frontend_address,
236            frontend_buffer: frontend_buffer_session,
237            frontend_token,
238            has_been_closed: false,
239            last_event: Instant::now(),
240            listener,
241            metrics,
242            proxy,
243            request_id,
244            state,
245            cluster_ip_tracked: false,
246        }
247    }
248
249    /// Source-IP for per-(cluster, source-IP) accounting.
250    ///
251    /// Prefer the parsed PROXY-v2 source from whichever upgrade phase is
252    /// in flight, then the post-upgrade `Pipe.session_address`, finally
253    /// the raw TCP `peer_addr` captured at session creation. The
254    /// `Pipe::session_address` itself is already PROXY-v2-aware after
255    /// `expect.rs::into_pipe` and `relay.rs::into_pipe`.
256    fn effective_session_address(&self) -> Option<SocketAddr> {
257        match &self.state {
258            TcpStateMachine::Pipe(pipe) => pipe.get_session_address(),
259            TcpStateMachine::ExpectProxyProtocol(epp) => {
260                epp.addresses.as_ref().and_then(|pa| pa.source())
261            }
262            TcpStateMachine::RelayProxyProtocol(rpp) => {
263                rpp.addresses.as_ref().and_then(|pa| pa.source())
264            }
265            TcpStateMachine::SendProxyProtocol(_) | TcpStateMachine::FailedUpgrade(_) => None,
266        }
267        .or(self.frontend_address)
268    }
269
270    fn log_request(&self) {
271        let listener = self.listener.borrow();
272        let context = self.log_context();
273        self.metrics.register_end_of_session(&context);
274        info_access!(
275            on_failure: { incr!(names::access_logs::UNSENT) },
276            message: None,
277            context,
278            session_address: self.frontend_address,
279            backend_address: None,
280            protocol: "TCP",
281            endpoint: EndpointRecord::Tcp,
282            tags: listener.get_tags(&listener.get_addr().to_string()),
283            client_rtt: socket_rtt(self.state.front_socket()),
284            server_rtt: None,
285            user_agent: None,
286            x_request_id: None,
287            // TCP listener accepts a raw `MioTcpStream` (lib/src/tcp.rs:128)
288            // — Sōzu does not terminate TLS on the TCP path, so all five TLS
289            // fields and the parsed XFF chain are always absent here.
290            tls_version: None,
291            tls_cipher: None,
292            tls_sni: None,
293            tls_alpn: None,
294            xff_chain: None,
295            service_time: self.metrics.service_time(),
296            response_time: self.metrics.backend_response_time(),
297            request_time: self.metrics.request_time(),
298            start_time_ns: self.metrics.start_wall_ns(),
299            bytes_in: self.metrics.bin,
300            bytes_out: self.metrics.bout,
301            otel: None,
302        );
303    }
304
305    fn front_hup(&mut self) -> SessionResult {
306        match &mut self.state {
307            TcpStateMachine::Pipe(pipe) => pipe.frontend_hup(&mut self.metrics),
308            _ => {
309                self.log_request();
310                SessionResult::Close
311            }
312        }
313    }
314
315    fn back_hup(&mut self) -> SessionResult {
316        match &mut self.state {
317            TcpStateMachine::Pipe(pipe) => pipe.backend_hup(&mut self.metrics),
318            _ => {
319                self.log_request();
320                SessionResult::Close
321            }
322        }
323    }
324
325    fn log_context(&self) -> LogContext<'_> {
326        LogContext {
327            session_id: self.request_id,
328            request_id: Some(self.request_id),
329            cluster_id: self.cluster_id.as_deref(),
330            backend_id: self.backend_id.as_deref(),
331        }
332    }
333
334    fn readable(&mut self) -> SessionResult {
335        if !self.container_frontend_timeout.reset() {
336            error!(
337                "{} Could not reset frontend timeout on readable",
338                log_context!(self)
339            );
340        }
341        if self.backend_connected == BackendConnectionStatus::Connected
342            && !self.container_backend_timeout.reset()
343        {
344            error!(
345                "{} Could not reset backend timeout on readable",
346                log_context!(self)
347            );
348        }
349        match &mut self.state {
350            TcpStateMachine::Pipe(pipe) => pipe.readable(&mut self.metrics),
351            TcpStateMachine::RelayProxyProtocol(pp) => pp.readable(&mut self.metrics),
352            TcpStateMachine::ExpectProxyProtocol(pp) => pp.readable(&mut self.metrics),
353            TcpStateMachine::SendProxyProtocol(_) => SessionResult::Continue,
354            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
355        }
356    }
357
358    fn writable(&mut self) -> SessionResult {
359        match &mut self.state {
360            TcpStateMachine::Pipe(pipe) => pipe.writable(&mut self.metrics),
361            _ => SessionResult::Continue,
362        }
363    }
364
365    fn back_readable(&mut self) -> SessionResult {
366        if !self.container_frontend_timeout.reset() {
367            error!(
368                "{} Could not reset frontend timeout on back_readable",
369                log_context!(self)
370            );
371        }
372        if !self.container_backend_timeout.reset() {
373            error!(
374                "{} Could not reset backend timeout on back_readable",
375                log_context!(self)
376            );
377        }
378
379        match &mut self.state {
380            TcpStateMachine::Pipe(pipe) => pipe.backend_readable(&mut self.metrics),
381            _ => SessionResult::Continue,
382        }
383    }
384
385    fn back_writable(&mut self) -> SessionResult {
386        match &mut self.state {
387            TcpStateMachine::Pipe(pipe) => pipe.backend_writable(&mut self.metrics),
388            TcpStateMachine::RelayProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
389            TcpStateMachine::SendProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
390            TcpStateMachine::ExpectProxyProtocol(_) => SessionResult::Continue,
391            TcpStateMachine::FailedUpgrade(_) => {
392                unreachable!()
393            }
394        }
395    }
396
397    fn back_socket_mut(&mut self) -> Option<&mut MioTcpStream> {
398        match &mut self.state {
399            TcpStateMachine::Pipe(pipe) => pipe.back_socket_mut(),
400            TcpStateMachine::SendProxyProtocol(pp) => pp.back_socket_mut(),
401            TcpStateMachine::RelayProxyProtocol(pp) => pp.back_socket_mut(),
402            TcpStateMachine::ExpectProxyProtocol(_) => None,
403            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
404        }
405    }
406
407    pub fn upgrade(&mut self) -> SessionIsToBeClosed {
408        let new_state = match self.state.take() {
409            TcpStateMachine::SendProxyProtocol(spp) => self.upgrade_send(spp),
410            TcpStateMachine::RelayProxyProtocol(rpp) => self.upgrade_relay(rpp),
411            TcpStateMachine::ExpectProxyProtocol(epp) => self.upgrade_expect(epp),
412            TcpStateMachine::Pipe(_) => None,
413            TcpStateMachine::FailedUpgrade(_) => todo!(),
414        };
415
416        match new_state {
417            Some(state) => {
418                self.state = state;
419                false
420            } // The state stays FailedUpgrade, but the Session should be closed right after
421
422            None => true,
423        }
424    }
425
426    fn upgrade_send(
427        &mut self,
428        send_proxy_protocol: SendProxyProtocol<MioTcpStream>,
429    ) -> Option<TcpStateMachine> {
430        if self.backend_buffer.is_some() && self.frontend_buffer.is_some() {
431            let mut pipe = send_proxy_protocol.into_pipe(
432                self.frontend_buffer.take().unwrap(),
433                self.backend_buffer.take().unwrap(),
434                self.listener.clone(),
435            );
436
437            pipe.set_cluster_id(self.cluster_id.clone());
438            gauge_add!(names::protocol::PROXY_SEND, -1);
439            gauge_add!(names::protocol::TCP, 1);
440            return Some(TcpStateMachine::Pipe(pipe));
441        }
442
443        error!(
444            "{} Missing the frontend or backend buffer queue, we can't switch to a pipe",
445            log_context!(self)
446        );
447        None
448    }
449
450    fn upgrade_relay(&mut self, rpp: RelayProxyProtocol<MioTcpStream>) -> Option<TcpStateMachine> {
451        if self.backend_buffer.is_some() {
452            let mut pipe =
453                rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone());
454            pipe.set_cluster_id(self.cluster_id.clone());
455            gauge_add!(names::protocol::PROXY_RELAY, -1);
456            gauge_add!(names::protocol::TCP, 1);
457            return Some(TcpStateMachine::Pipe(pipe));
458        }
459
460        error!(
461            "{} Missing the backend buffer queue, we can't switch to a pipe",
462            log_context!(self)
463        );
464        None
465    }
466
467    fn upgrade_expect(
468        &mut self,
469        epp: ExpectProxyProtocol<MioTcpStream>,
470    ) -> Option<TcpStateMachine> {
471        if self.frontend_buffer.is_some() && self.backend_buffer.is_some() {
472            let mut pipe = epp.into_pipe(
473                self.frontend_buffer.take().unwrap(),
474                self.backend_buffer.take().unwrap(),
475                None,
476                None,
477                self.listener.clone(),
478            );
479
480            pipe.set_cluster_id(self.cluster_id.clone());
481            gauge_add!(names::protocol::PROXY_EXPECT, -1);
482            gauge_add!(names::protocol::TCP, 1);
483            return Some(TcpStateMachine::Pipe(pipe));
484        }
485
486        error!(
487            "{} Missing the backend buffer queue, we can't switch to a pipe",
488            log_context!(self)
489        );
490        None
491    }
492
493    fn front_readiness(&mut self) -> &mut Readiness {
494        match &mut self.state {
495            TcpStateMachine::Pipe(pipe) => &mut pipe.frontend_readiness,
496            TcpStateMachine::SendProxyProtocol(pp) => &mut pp.frontend_readiness,
497            TcpStateMachine::RelayProxyProtocol(pp) => &mut pp.frontend_readiness,
498            TcpStateMachine::ExpectProxyProtocol(pp) => &mut pp.frontend_readiness,
499            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
500        }
501    }
502
503    fn back_readiness(&mut self) -> Option<&mut Readiness> {
504        match &mut self.state {
505            TcpStateMachine::Pipe(pipe) => Some(&mut pipe.backend_readiness),
506            TcpStateMachine::SendProxyProtocol(pp) => Some(&mut pp.backend_readiness),
507            TcpStateMachine::RelayProxyProtocol(pp) => Some(&mut pp.backend_readiness),
508            TcpStateMachine::ExpectProxyProtocol(_) => None,
509            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
510        }
511    }
512
513    fn set_back_socket(&mut self, socket: MioTcpStream) {
514        match &mut self.state {
515            TcpStateMachine::Pipe(pipe) => pipe.set_back_socket(socket),
516            TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_socket(socket),
517            TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_socket(socket),
518            TcpStateMachine::ExpectProxyProtocol(_) => {
519                error!(
520                    "{} We should not set the back socket for the expect proxy protocol",
521                    log_context!(self)
522                );
523                panic!(
524                    "{} We should not set the back socket for the expect proxy protocol",
525                    log_context!(self)
526                );
527            }
528            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
529        }
530    }
531
532    fn set_back_token(&mut self, token: Token) {
533        // The frontend must own a token distinct from the backend's: the two
534        // index different slab slots, so wiring the same token to both would
535        // alias two sessions onto one slot.
536        debug_assert_ne!(
537            token, self.frontend_token,
538            "backend token must differ from the frontend token"
539        );
540        self.backend_token = Some(token);
541
542        match &mut self.state {
543            TcpStateMachine::Pipe(pipe) => pipe.set_back_token(token),
544            TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_token(token),
545            TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_token(token),
546            TcpStateMachine::ExpectProxyProtocol(_) => self.backend_token = Some(token),
547            TcpStateMachine::FailedUpgrade(_) => unreachable!(),
548        }
549
550        // Postcondition: the session now owns exactly the token it was asked
551        // to register — every arm above (including the Expect arm, which only
552        // stores the session-side token) leaves `backend_token == Some(token)`.
553        debug_assert_eq!(
554            self.backend_token,
555            Some(token),
556            "set_back_token must leave the session owning the registered token"
557        );
558    }
559
560    fn set_backend_id(&mut self, id: String) {
561        self.backend_id = Some(id.clone());
562        if let TcpStateMachine::Pipe(pipe) = &mut self.state {
563            pipe.set_backend_id(Some(id));
564        }
565    }
566
567    fn back_connected(&self) -> BackendConnectionStatus {
568        self.backend_connected
569    }
570
571    fn set_back_connected(&mut self, status: BackendConnectionStatus) {
572        let last = self.backend_connected;
573        // Transitioning INTO `Connected` bumps the backend-connection gauge by
574        // exactly +1. Doing so from an already-`Connected` state would
575        // double-count (gauge drift that only `close_backend`'s single -1
576        // would later reconcile, leaving the gauge permanently +1). The
577        // promotion always comes from a `Connecting` (the normal handshake
578        // completion in `ready_inner`) — never from `Connected` itself.
579        debug_assert!(
580            status != BackendConnectionStatus::Connected
581                || last != BackendConnectionStatus::Connected,
582            "set_back_connected(Connected) must not run on an already-Connected backend (gauge would double-count)"
583        );
584        self.backend_connected = status;
585
586        // Postcondition: the requested status is now in effect.
587        debug_assert_eq!(
588            self.backend_connected, status,
589            "set_back_connected must record the requested status"
590        );
591
592        if status == BackendConnectionStatus::Connected {
593            gauge_add!(names::backend::CONNECTIONS, 1);
594            gauge_add!(
595                names::backend::CONNECTIONS_PER_BACKEND,
596                1,
597                self.cluster_id.as_deref(),
598                self.metrics.backend_id.as_deref()
599            );
600
601            // the back timeout was of connect_timeout duration before,
602            // now that we're connected, move to backend_timeout duration
603            self.container_backend_timeout
604                .set_duration(self.configured_backend_timeout);
605            self.container_frontend_timeout.reset();
606
607            if let TcpStateMachine::SendProxyProtocol(spp) = &mut self.state {
608                spp.set_back_connected(BackendConnectionStatus::Connected);
609            }
610
611            if let Some(backend) = self.backend.as_ref() {
612                let mut backend = backend.borrow_mut();
613
614                if backend.retry_policy.is_down() {
615                    incr!(
616                        "backend.up",
617                        self.cluster_id.as_deref(),
618                        self.metrics.backend_id.as_deref()
619                    );
620                    gauge!(
621                        names::backend::AVAILABLE,
622                        1,
623                        self.cluster_id.as_deref(),
624                        self.metrics.backend_id.as_deref()
625                    );
626                    info!(
627                        "{} backend server {} at {} is up",
628                        log_context!(self),
629                        backend.backend_id,
630                        backend.address
631                    );
632                    push_event(Event {
633                        kind: EventKind::BackendUp as i32,
634                        backend_id: Some(backend.backend_id.to_owned()),
635                        address: Some(backend.address.into()),
636                        cluster_id: None,
637                        metric_detail: None,
638                    });
639                }
640
641                if let BackendConnectionStatus::Connecting(start) = last {
642                    backend.set_connection_time(Instant::now() - start);
643                }
644
645                //successful connection, rest failure counter
646                backend.failures = 0;
647                backend.retry_policy.succeed();
648            }
649        }
650    }
651
652    fn remove_backend(&mut self) {
653        if let Some(backend) = self.backend.take() {
654            (*backend.borrow_mut()).dec_connections();
655        }
656
657        self.backend_token = None;
658
659        // Postcondition: the backend handle and its token are torn down
660        // together — neither may outlive the other (a dangling token would
661        // leave a stale slab reference; a dangling handle would over-count
662        // backend connections).
663        debug_assert!(
664            self.backend.is_none(),
665            "remove_backend must release the backend handle"
666        );
667        debug_assert!(
668            self.backend_token.is_none(),
669            "remove_backend must clear the backend token"
670        );
671    }
672
673    fn fail_backend_connection(&mut self) {
674        if let Some(backend) = self.backend.as_ref() {
675            let backend = &mut *backend.borrow_mut();
676            backend.failures += 1;
677
678            let already_unavailable = backend.retry_policy.is_down();
679            backend.retry_policy.fail();
680            incr!(
681                "backend.connections.error",
682                self.cluster_id.as_deref(),
683                self.metrics.backend_id.as_deref()
684            );
685            if !already_unavailable && backend.retry_policy.is_down() {
686                error!(
687                    "{} backend server {} at {} is down",
688                    log_context!(self),
689                    backend.backend_id,
690                    backend.address
691                );
692                incr!(
693                    "backend.down",
694                    self.cluster_id.as_deref(),
695                    self.metrics.backend_id.as_deref()
696                );
697                gauge!(
698                    names::backend::AVAILABLE,
699                    0,
700                    self.cluster_id.as_deref(),
701                    self.metrics.backend_id.as_deref()
702                );
703
704                push_event(Event {
705                    kind: EventKind::BackendDown as i32,
706                    backend_id: Some(backend.backend_id.to_owned()),
707                    address: Some(backend.address.into()),
708                    cluster_id: None,
709                    metric_detail: None,
710                });
711            }
712        }
713    }
714
715    pub fn test_back_socket(&mut self) -> SessionIsToBeClosed {
716        match self.back_socket_mut() {
717            Some(ref mut s) => {
718                let mut tmp = [0u8; 1];
719                let res = s.peek(&mut tmp[..]);
720
721                match res {
722                    // if the socket is half open, it will report 0 bytes read (EOF)
723                    Ok(0) => false,
724                    Ok(_) => true,
725                    Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
726                }
727            }
728            None => false,
729        }
730    }
731
732    pub fn cancel_timeouts(&mut self) {
733        self.container_frontend_timeout.cancel();
734        self.container_backend_timeout.cancel();
735    }
736
737    /// Full cross-field invariant sweep for the TCP session state machine.
738    ///
739    /// Run as a run-to-completion postcondition at the END of `ready()` (the
740    /// only public entry point that drives the front/back token + readiness
741    /// state machine). These are OUR-logic invariants — never reachable from
742    /// hostile traffic — so a violation is a bug in Sōzu, not a malformed
743    /// peer. Compiled out in release.
744    #[cfg(debug_assertions)]
745    fn check_invariants(&self) {
746        // Connection-attempt budget: every retry path increments
747        // `connection_attempt` and `connect_to_backend` refuses once the
748        // counter reaches `CONN_RETRIES`, so the value can touch but never
749        // exceed the configured ceiling (and resets to 0 on success).
750        debug_assert!(
751            self.connection_attempt <= CONN_RETRIES,
752            "connection_attempt ({}) must never exceed CONN_RETRIES ({})",
753            self.connection_attempt,
754            CONN_RETRIES
755        );
756
757        // Token ownership: a fully-connected backend always owns a backend
758        // token (set by `set_back_token` during `connect_to_backend`, before
759        // the status can ever flip to `Connected`). The `Connecting` phase is
760        // deliberately excluded: there is a transient window inside
761        // `connect_to_backend` where the status is `Connecting` but the token
762        // has not been wired yet — that window never spans a `ready()`
763        // boundary, so the postcondition still holds here.
764        if self.backend_connected == BackendConnectionStatus::Connected {
765            debug_assert!(
766                self.backend_token.is_some(),
767                "a Connected backend must own a backend token"
768            );
769        }
770
771        // A live backend handle implies the matching token is present: the
772        // two are wired together in `connect_to_backend` and torn down
773        // together in `remove_backend` (which clears the token) — they must
774        // never drift apart. (For the pure-TCP proxy `backend` is currently
775        // always `None`, so this is a guard against a future regression that
776        // starts populating it without the token.)
777        if self.backend.is_some() {
778            debug_assert!(
779                self.backend_token.is_some(),
780                "a live backend handle must have a backend token"
781            );
782        }
783
784        // Once the session has been closed it is terminal: the backend has
785        // been released and the per-(cluster, source-IP) slot untracked.
786        if self.has_been_closed {
787            debug_assert!(
788                self.backend.is_none(),
789                "a closed session must have released its backend handle"
790            );
791            debug_assert!(
792                !self.cluster_ip_tracked,
793                "a closed session must have untracked its (cluster, source-IP) slot"
794            );
795        }
796    }
797
798    fn ready_inner(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionResult {
799        let mut counter = 0;
800
801        let back_connected = self.back_connected();
802        if back_connected.is_connecting() {
803            if self.back_readiness().unwrap().event.is_hup() && !self.test_back_socket() {
804                //retry connecting the backend
805                debug!(
806                    "{} error connecting to backend, trying again",
807                    log_context!(self)
808                );
809                self.connection_attempt += 1;
810                self.fail_backend_connection();
811
812                // trigger a backend reconnection
813                self.close_backend();
814                let connection_result = self.connect_to_backend(session.clone());
815                if let Err(err) = &connection_result {
816                    match err {
817                        // Already logged at warn! + metered at the retry-budget
818                        // gate in connect_to_backend; avoid double-emission.
819                        BackendConnectionError::MaxConnectionRetries(_) => trace!(
820                            "{} Error connecting to backend: {}",
821                            log_context!(self),
822                            err
823                        ),
824                        _ => warn!(
825                            "{} Error connecting to backend: {}",
826                            log_context!(self),
827                            err
828                        ),
829                    }
830                }
831
832                if let Some(state_result) = handle_connection_result(connection_result) {
833                    return state_result;
834                }
835            } else if self.back_readiness().unwrap().event != Ready::EMPTY {
836                self.connection_attempt = 0;
837                self.set_back_connected(BackendConnectionStatus::Connected);
838            }
839        } else if back_connected == BackendConnectionStatus::NotConnected {
840            let connection_result = self.connect_to_backend(session.clone());
841            if let Err(err) = &connection_result {
842                match err {
843                    BackendConnectionError::MaxConnectionRetries(_) => trace!(
844                        "{} Error connecting to backend: {}",
845                        log_context!(self),
846                        err
847                    ),
848                    _ => warn!(
849                        "{} Error connecting to backend: {}",
850                        log_context!(self),
851                        err
852                    ),
853                }
854            }
855
856            if let Some(state_result) = handle_connection_result(connection_result) {
857                return state_result;
858            }
859        }
860
861        if self.front_readiness().event.is_hup() {
862            let session_result = self.front_hup();
863            if session_result == SessionResult::Continue {
864                self.front_readiness().event.remove(Ready::HUP);
865            }
866            return session_result;
867        }
868
869        while counter < MAX_LOOP_ITERATIONS {
870            let front_interest = self.front_readiness().interest & self.front_readiness().event;
871            let back_interest = self
872                .back_readiness()
873                .map(|r| r.interest & r.event)
874                .unwrap_or(Ready::EMPTY);
875
876            trace!(
877                "{} Frontend interest({:?}) and backend interest({:?})",
878                log_context!(self),
879                front_interest,
880                back_interest
881            );
882
883            if front_interest == Ready::EMPTY && back_interest == Ready::EMPTY {
884                break;
885            }
886
887            if self
888                .back_readiness()
889                .map(|r| r.event.is_hup())
890                .unwrap_or(false)
891                && self.front_readiness().interest.is_writable()
892                && !self.front_readiness().event.is_writable()
893            {
894                break;
895            }
896
897            if front_interest.is_readable() {
898                let session_result = self.readable();
899                if session_result != SessionResult::Continue {
900                    return session_result;
901                }
902            }
903
904            if back_interest.is_writable() {
905                let session_result = self.back_writable();
906                if session_result != SessionResult::Continue {
907                    return session_result;
908                }
909            }
910
911            if back_interest.is_readable() {
912                let session_result = self.back_readable();
913                if session_result != SessionResult::Continue {
914                    return session_result;
915                }
916            }
917
918            if front_interest.is_writable() {
919                let session_result = self.writable();
920                if session_result != SessionResult::Continue {
921                    return session_result;
922                }
923            }
924
925            if back_interest.is_hup() {
926                let session_result = self.back_hup();
927                if session_result != SessionResult::Continue {
928                    return session_result;
929                }
930            }
931
932            if front_interest.is_error() {
933                error!(
934                    "{} Frontend socket error, disconnecting",
935                    log_context!(self)
936                );
937                self.front_readiness().interest = Ready::EMPTY;
938                if let Some(r) = self.back_readiness() {
939                    r.interest = Ready::EMPTY;
940                }
941
942                return SessionResult::Close;
943            }
944
945            if back_interest.is_error() && self.back_hup() == SessionResult::Close {
946                self.front_readiness().interest = Ready::EMPTY;
947                if let Some(r) = self.back_readiness() {
948                    r.interest = Ready::EMPTY;
949                }
950
951                error!("{} backend socket error, disconnecting", log_context!(self));
952                return SessionResult::Close;
953            }
954
955            counter += 1;
956        }
957
958        if counter >= MAX_LOOP_ITERATIONS {
959            error!(
960                "{} Handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
961                log_context!(self),
962                MAX_LOOP_ITERATIONS
963            );
964
965            incr!(names::tcp::INFINITE_LOOP_ERROR);
966
967            let front_interest = self.front_readiness().interest & self.front_readiness().event;
968            let back_interest = self
969                .back_readiness()
970                .map(|r| r.interest & r.event)
971                .unwrap_or(Ready::EMPTY);
972
973            let back = self.back_readiness().cloned();
974
975            error!(
976                "{} readiness: front {:?} / back {:?} | front: {:?} | back: {:?} ",
977                log_context!(self),
978                self.front_readiness(),
979                back,
980                front_interest,
981                back_interest
982            );
983
984            self.print_session();
985
986            return SessionResult::Close;
987        }
988
989        SessionResult::Continue
990    }
991
992    /// TCP session closes its backend on its own, without defering this task to the state
993    fn close_backend(&mut self) {
994        if let (Some(token), Some(fd)) = (
995            self.backend_token,
996            self.back_socket_mut().map(|s| s.as_raw_fd()),
997        ) {
998            let proxy = self.proxy.borrow();
999            if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
1000                error!(
1001                    "{} Error deregistering socket({:?}): {:?}",
1002                    log_context!(self),
1003                    fd,
1004                    e
1005                );
1006            }
1007
1008            proxy.sessions.borrow_mut().slab.try_remove(token.0);
1009        }
1010        self.remove_backend();
1011
1012        let back_connected = self.back_connected();
1013        if back_connected != BackendConnectionStatus::NotConnected {
1014            if let Some(r) = self.back_readiness() {
1015                r.event = Ready::EMPTY;
1016            }
1017
1018            let log_context = log_context!(self);
1019            if let Some(sock) = self.back_socket_mut() {
1020                // TCP-only backend in the pure-TCP proxy: no outbound TLS
1021                // buffer to truncate, so `Shutdown::Both` is the right call.
1022                // If the TCP listener ever gains an inline TLS upgrade,
1023                // switch to `Shutdown::Write` here.
1024                if let Err(e) = sock.shutdown(Shutdown::Both) {
1025                    if e.kind() != ErrorKind::NotConnected {
1026                        error!(
1027                            "{} Error closing back socket({:?}): {:?}",
1028                            log_context, sock, e
1029                        );
1030                    }
1031                }
1032            }
1033        }
1034
1035        // The -1 here pairs with the +1 in `set_back_connected(Connected)`:
1036        // we decrement the gauge exactly once, iff this session had actually
1037        // reached `Connected`. A `Connecting`/`NotConnected` backend never
1038        // bumped the gauge, so it must not decrement it either — that
1039        // asymmetry would underflow the gauge (a correctness bug, never a
1040        // rounding issue).
1041        if back_connected == BackendConnectionStatus::Connected {
1042            gauge_add!(names::backend::CONNECTIONS, -1);
1043            gauge_add!(
1044                names::backend::CONNECTIONS_PER_BACKEND,
1045                -1,
1046                self.cluster_id.as_deref(),
1047                self.metrics.backend_id.as_deref()
1048            );
1049        }
1050
1051        self.set_back_connected(BackendConnectionStatus::NotConnected);
1052
1053        // Postcondition: the backend is fully torn down — `remove_backend`
1054        // cleared the token/handle above and the status is now `NotConnected`,
1055        // so a subsequent `connect_to_backend` starts from a clean slate.
1056        debug_assert_eq!(
1057            self.backend_connected,
1058            BackendConnectionStatus::NotConnected,
1059            "close_backend must leave the backend NotConnected"
1060        );
1061        debug_assert!(
1062            self.backend_token.is_none(),
1063            "close_backend must clear the backend token"
1064        );
1065    }
1066
1067    fn connect_to_backend(
1068        &mut self,
1069        session_rc: Rc<RefCell<dyn ProxySession>>,
1070    ) -> Result<BackendConnectAction, BackendConnectionError> {
1071        // Precondition: the retry budget can sit AT the ceiling (the gate
1072        // below converts that into `MaxConnectionRetries`) but the increment
1073        // in `ready_inner` must never have pushed it past `CONN_RETRIES`.
1074        debug_assert!(
1075            self.connection_attempt <= CONN_RETRIES,
1076            "connection_attempt ({}) overflowed CONN_RETRIES ({}) before the retry gate",
1077            self.connection_attempt,
1078            CONN_RETRIES
1079        );
1080
1081        let cluster_id = self
1082            .listener
1083            .borrow()
1084            .cluster_id
1085            .clone()
1086            .ok_or(BackendConnectionError::NotFound(ObjectKind::TcpCluster))?;
1087
1088        self.cluster_id = Some(cluster_id.clone());
1089
1090        if self.connection_attempt >= CONN_RETRIES {
1091            incr!(
1092                "backend.connect.retries_exhausted",
1093                self.cluster_id.as_deref(),
1094                self.metrics.backend_id.as_deref()
1095            );
1096            warn!(
1097                "{} Max connection attempt reached ({})",
1098                log_context!(self),
1099                self.connection_attempt
1100            );
1101            return Err(BackendConnectionError::MaxConnectionRetries(Some(
1102                cluster_id,
1103            )));
1104        }
1105
1106        if self.proxy.borrow().sessions.borrow().at_capacity() {
1107            return Err(BackendConnectionError::MaxSessionsMemory);
1108        }
1109
1110        // Per-(cluster, source-IP) connection limit gate (TCP). The
1111        // source IP comes from `effective_session_address`, which folds
1112        // a parsed PROXY-v2 source over the raw `peer_addr`. The mux's
1113        // Router does the same gate for HTTP/HTTPS sessions; here it
1114        // runs for raw TCP. Rejection produces a graceful TCP FIN via
1115        // `BackendConnectionError::TooManyConnectionsPerIp` →
1116        // `handle_connection_result` → `SessionResult::Close` — TCP has
1117        // no HTTP envelope to carry a 429 / `Retry-After`.
1118        let cluster_max_connections_per_ip = self
1119            .proxy
1120            .borrow()
1121            .configs
1122            .get(&cluster_id)
1123            .and_then(|c| c.max_connections_per_ip);
1124        if let Some(ip) = self.effective_session_address().map(|sa| sa.ip()) {
1125            let sessions_rc = self.proxy.borrow().sessions.clone();
1126            let at_limit = sessions_rc.borrow().cluster_ip_at_limit(
1127                self.frontend_token,
1128                &cluster_id,
1129                &ip,
1130                cluster_max_connections_per_ip,
1131            );
1132            if at_limit {
1133                debug!(
1134                    "{} per-(cluster, source-IP) limit hit for cluster {} from {}",
1135                    log_context!(self),
1136                    cluster_id,
1137                    ip
1138                );
1139                return Err(BackendConnectionError::TooManyConnectionsPerIp { cluster_id });
1140            }
1141            sessions_rc
1142                .borrow_mut()
1143                .track_cluster_ip(self.frontend_token, cluster_id.clone(), ip);
1144            self.cluster_ip_tracked = true;
1145        }
1146
1147        let (backend, mut stream) = self
1148            .proxy
1149            .borrow()
1150            .backends
1151            .borrow_mut()
1152            .backend_from_cluster_id(&cluster_id)
1153            .map_err(BackendConnectionError::Backend)?;
1154
1155        if let Err(e) = stream.set_nodelay(true) {
1156            error!(
1157                "{} Error setting nodelay on back socket({:?}): {:?}",
1158                log_context!(self),
1159                stream,
1160                e
1161            );
1162        }
1163        self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());
1164
1165        let back_token = {
1166            let proxy = self.proxy.borrow();
1167            let mut s = proxy.sessions.borrow_mut();
1168            let entry = s.slab.vacant_entry();
1169            let back_token = Token(entry.key());
1170            let _entry = entry.insert(session_rc.clone());
1171            back_token
1172        };
1173
1174        if let Err(e) = self.proxy.borrow().registry.register(
1175            &mut stream,
1176            back_token,
1177            Interest::READABLE | Interest::WRITABLE,
1178        ) {
1179            error!(
1180                "{} Error registering back socket({:?}): {:?}",
1181                log_context!(self),
1182                stream,
1183                e
1184            );
1185        }
1186
1187        self.container_backend_timeout.set(back_token);
1188
1189        self.set_back_token(back_token);
1190        self.set_back_socket(stream);
1191
1192        self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
1193        self.metrics.backend_start();
1194        self.set_backend_id(backend.borrow().backend_id.clone());
1195
1196        // Postcondition of a successful New connect: the session is wired to
1197        // its freshly-registered backend token and the status reflects an
1198        // in-flight handshake (`Connecting`). The promotion to `Connected`
1199        // happens later in `ready_inner` once the socket signals writable.
1200        debug_assert!(
1201            self.backend_token.is_some(),
1202            "a New backend connection must own its backend token"
1203        );
1204        debug_assert!(
1205            self.backend_connected.is_connecting(),
1206            "a New backend connection must be in the Connecting state"
1207        );
1208
1209        Ok(BackendConnectAction::New)
1210    }
1211}
1212
1213impl ProxySession for TcpSession {
1214    fn close(&mut self) {
1215        if self.has_been_closed {
1216            return;
1217        }
1218
1219        // Past the idempotency guard the session is closing for the first
1220        // time: every gauge-restore / untrack below must run exactly once, so
1221        // re-entry on an already-closed session would double-decrement.
1222        debug_assert!(
1223            !self.has_been_closed,
1224            "close() body must only run on a not-yet-closed session"
1225        );
1226
1227        // TODO: the state should handle the timeouts
1228        trace!("{} Closing TCP session", log_context!(self));
1229        self.metrics.service_stop();
1230
1231        // Drain the per-(cluster, source-IP) accounting before any
1232        // early-return path below. The fail / non-fail close branches
1233        // both count, and the SessionManager-side untrack is idempotent
1234        // (no-op when the slot was never tracked) so this is safe even
1235        // when `cluster_ip_tracked` is false.
1236        if self.cluster_ip_tracked {
1237            self.proxy
1238                .borrow()
1239                .sessions
1240                .borrow_mut()
1241                .untrack_all_cluster_ip(self.frontend_token);
1242            self.cluster_ip_tracked = false;
1243        }
1244
1245        // Restore gauges
1246        match self.state.marker() {
1247            StateMarker::Pipe => gauge_add!(names::protocol::TCP, -1),
1248            StateMarker::SendProxyProtocol => gauge_add!(names::protocol::PROXY_SEND, -1),
1249            StateMarker::RelayProxyProtocol => gauge_add!(names::protocol::PROXY_RELAY, -1),
1250            StateMarker::ExpectProxyProtocol => gauge_add!(names::protocol::PROXY_EXPECT, -1),
1251        }
1252
1253        if self.state.failed() {
1254            match self.state.marker() {
1255                StateMarker::Pipe => incr!(names::tcp::UPGRADE_PIPE_FAILED),
1256                StateMarker::SendProxyProtocol => incr!(names::tcp::UPGRADE_SEND_FAILED),
1257                StateMarker::RelayProxyProtocol => incr!(names::tcp::UPGRADE_RELAY_FAILED),
1258                StateMarker::ExpectProxyProtocol => incr!(names::tcp::UPGRADE_EXPECT_FAILED),
1259            }
1260            return;
1261        }
1262
1263        self.cancel_timeouts();
1264
1265        let front_socket = self.state.front_socket();
1266        // TCP listener is plaintext at this layer — `Shutdown::Both` does not
1267        // truncate any TLS write buffer, so the canonical anti-pattern
1268        // (forces a TCP RST on the read direction, dropping in-flight bytes)
1269        // does not apply. Move to `Shutdown::Write` if a TLS upgrade ever
1270        // wraps this listener.
1271        if let Err(e) = front_socket.shutdown(Shutdown::Both) {
1272            // error 107 NotConnected can happen when was never fully connected, or was already disconnected due to error
1273            if e.kind() != ErrorKind::NotConnected {
1274                error!(
1275                    "{} Error shutting down front socket({:?}): {:?}",
1276                    log_context!(self),
1277                    front_socket,
1278                    e
1279                );
1280            }
1281        }
1282
1283        // deregister the frontend and remove it, in a separate scope to drop proxy when done
1284        {
1285            let proxy = self.proxy.borrow();
1286            let fd = front_socket.as_raw_fd();
1287            if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
1288                error!(
1289                    "{} Error deregistering front socket({:?}) while closing TCP session: {:?}",
1290                    log_context!(self),
1291                    fd,
1292                    e
1293                );
1294            }
1295            proxy
1296                .sessions
1297                .borrow_mut()
1298                .slab
1299                .try_remove(self.frontend_token.0);
1300        }
1301
1302        self.close_backend();
1303        self.has_been_closed = true;
1304
1305        // Postcondition of the normal close path: the session is terminal and
1306        // every accounting slot has been released — `close_backend` cleared
1307        // the backend token, and the per-(cluster, source-IP) untrack above
1308        // reset the flag. The idempotency guard now short-circuits any repeat.
1309        debug_assert!(self.has_been_closed, "close() must mark the session closed");
1310        debug_assert!(
1311            self.backend_token.is_none(),
1312            "close() must leave no dangling backend token"
1313        );
1314        debug_assert!(
1315            !self.cluster_ip_tracked,
1316            "close() must untrack the (cluster, source-IP) slot"
1317        );
1318    }
1319
1320    fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
1321        // The frontend and backend slots are distinct tokens, so the two
1322        // dispatch arms below are mutually exclusive — a single token can
1323        // never match both. (Obsolete tokens matching neither are tolerated
1324        // and fall through to the `false` arm.)
1325        debug_assert!(
1326            self.backend_token != Some(self.frontend_token),
1327            "frontend and backend tokens must never collide"
1328        );
1329        if self.frontend_token == token {
1330            self.container_frontend_timeout.triggered();
1331            return true;
1332        }
1333        if self.backend_token == Some(token) {
1334            self.container_backend_timeout.triggered();
1335            return true;
1336        }
1337        // invalid token, obsolete timeout triggered
1338        false
1339    }
1340
1341    fn protocol(&self) -> Protocol {
1342        Protocol::TCP
1343    }
1344
1345    fn update_readiness(&mut self, token: Token, events: Ready) {
1346        trace!(
1347            "{} token {:?} got event {}",
1348            log_context!(self),
1349            token,
1350            super::ready_to_string(events)
1351        );
1352
1353        self.last_event = Instant::now();
1354        self.metrics.wait_start();
1355
1356        if self.frontend_token == token {
1357            self.front_readiness().event = self.front_readiness().event | events;
1358        } else if self.backend_token == Some(token) {
1359            if let Some(r) = self.back_readiness() {
1360                r.event |= events;
1361            }
1362        }
1363    }
1364
1365    fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1366        self.metrics.service_start();
1367
1368        let session_result = self.ready_inner(session.clone());
1369
1370        let to_bo_closed = match session_result {
1371            SessionResult::Close => true,
1372            SessionResult::Continue => false,
1373            SessionResult::Upgrade => match self.upgrade() {
1374                false => self.ready(session),
1375                true => true,
1376            },
1377        };
1378
1379        self.metrics.service_stop();
1380
1381        // Run-to-completion postcondition: the front/back token + readiness
1382        // state machine must satisfy its cross-field invariants after every
1383        // `ready()` pass. Cfg-guarded so the call (and `check_invariants`
1384        // itself) is absent from release builds.
1385        #[cfg(debug_assertions)]
1386        self.check_invariants();
1387
1388        to_bo_closed
1389    }
1390
1391    fn shutting_down(&mut self) -> SessionIsToBeClosed {
1392        true
1393    }
1394
1395    fn last_event(&self) -> Instant {
1396        self.last_event
1397    }
1398
1399    fn print_session(&self) {
1400        let state: String = match &self.state {
1401            TcpStateMachine::ExpectProxyProtocol(_) => String::from("Expect"),
1402            TcpStateMachine::SendProxyProtocol(_) => String::from("Send"),
1403            TcpStateMachine::RelayProxyProtocol(_) => String::from("Relay"),
1404            TcpStateMachine::Pipe(_) => String::from("TCP"),
1405            TcpStateMachine::FailedUpgrade(marker) => format!("FailedUpgrade({marker:?})"),
1406        };
1407
1408        let front_readiness = match &self.state {
1409            TcpStateMachine::ExpectProxyProtocol(expect) => Some(&expect.frontend_readiness),
1410            TcpStateMachine::SendProxyProtocol(send) => Some(&send.frontend_readiness),
1411            TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.frontend_readiness),
1412            TcpStateMachine::Pipe(pipe) => Some(&pipe.frontend_readiness),
1413            TcpStateMachine::FailedUpgrade(_) => None,
1414        };
1415
1416        let back_readiness = match &self.state {
1417            TcpStateMachine::SendProxyProtocol(send) => Some(&send.backend_readiness),
1418            TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.backend_readiness),
1419            TcpStateMachine::Pipe(pipe) => Some(&pipe.backend_readiness),
1420            TcpStateMachine::ExpectProxyProtocol(_) => None,
1421            TcpStateMachine::FailedUpgrade(_) => None,
1422        };
1423
1424        error!(
1425            "\
1426{} Session ({:?})
1427\tFrontend:
1428\t\ttoken: {:?}\treadiness: {:?}
1429\tBackend:
1430\t\ttoken: {:?}\treadiness: {:?}\tstatus: {:?}\tcluster id: {:?}",
1431            log_context!(self),
1432            state,
1433            self.frontend_token,
1434            front_readiness,
1435            self.backend_token,
1436            back_readiness,
1437            self.backend_connected,
1438            self.cluster_id
1439        );
1440        error!("Metrics: {:?}", self.metrics);
1441    }
1442
1443    fn frontend_token(&self) -> Token {
1444        self.frontend_token
1445    }
1446}
1447
1448pub struct TcpListener {
1449    active: SessionIsToBeClosed,
1450    address: SocketAddr,
1451    cluster_id: Option<String>,
1452    config: TcpListenerConfig,
1453    listener: Option<MioTcpListener>,
1454    tags: BTreeMap<String, CachedTags>,
1455    token: Token,
1456}
1457
1458impl ListenerHandler for TcpListener {
1459    fn get_addr(&self) -> &SocketAddr {
1460        &self.address
1461    }
1462
1463    fn get_tags(&self, key: &str) -> Option<&CachedTags> {
1464        self.tags.get(key)
1465    }
1466
1467    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
1468        match tags {
1469            Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
1470            None => self.tags.remove(&key),
1471        };
1472    }
1473
1474    fn protocol(&self) -> Protocol {
1475        Protocol::TCP
1476    }
1477
1478    fn public_address(&self) -> SocketAddr {
1479        self.config
1480            .public_address
1481            .map(|addr| addr.into())
1482            .unwrap_or(self.address)
1483    }
1484}
1485
1486impl TcpListener {
1487    fn new(config: TcpListenerConfig, token: Token) -> Result<TcpListener, ListenerError> {
1488        Ok(TcpListener {
1489            cluster_id: None,
1490            listener: None,
1491            token,
1492            address: config.address.into(),
1493            config,
1494            active: false,
1495            tags: BTreeMap::new(),
1496        })
1497    }
1498
1499    pub fn activate(
1500        &mut self,
1501        registry: &Registry,
1502        tcp_listener: Option<MioTcpListener>,
1503    ) -> Result<Token, ProxyError> {
1504        if self.active {
1505            return Ok(self.token);
1506        }
1507
1508        let mut listener = match tcp_listener {
1509            Some(listener) => listener,
1510            None => {
1511                let address = self.config.address.into();
1512                server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
1513            }
1514        };
1515
1516        registry
1517            .register(&mut listener, self.token, Interest::READABLE)
1518            .map_err(ProxyError::RegisterListener)?;
1519
1520        self.listener = Some(listener);
1521        self.active = true;
1522        Ok(self.token)
1523    }
1524
1525    /// Apply a partial-update patch to this TCP listener's live configuration.
1526    ///
1527    /// Fields absent in the patch (i.e. `None`) are preserved unchanged.
1528    pub fn update_config(&mut self, patch: &UpdateTcpListenerConfig) -> Result<(), ListenerError> {
1529        if let Some(v) = patch.public_address {
1530            self.config.public_address = Some(v);
1531        }
1532        if let Some(v) = patch.expect_proxy {
1533            self.config.expect_proxy = v;
1534        }
1535        if let Some(v) = patch.front_timeout {
1536            self.config.front_timeout = v;
1537        }
1538        if let Some(v) = patch.back_timeout {
1539            self.config.back_timeout = v;
1540        }
1541        if let Some(v) = patch.connect_timeout {
1542            self.config.connect_timeout = v;
1543        }
1544        Ok(())
1545    }
1546}
1547
1548fn handle_connection_result(
1549    connection_result: Result<BackendConnectAction, BackendConnectionError>,
1550) -> Option<SessionResult> {
1551    match connection_result {
1552        // reuse connection or send a default answer, we can continue
1553        Ok(BackendConnectAction::Reuse) => None,
1554        Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
1555            // we must wait for an event
1556            Some(SessionResult::Continue)
1557        }
1558        Err(_) => {
1559            // in case of BackendConnectionError::Backend(BackendError::ConnectionFailures(..))
1560            // we may want to retry instead of closing
1561            Some(SessionResult::Close)
1562        }
1563    }
1564}
1565
1566#[derive(Debug)]
1567pub struct ClusterConfiguration {
1568    proxy_protocol: Option<ProxyProtocolConfig>,
1569    // Uncomment this when implementing new load balancing algorithms
1570    // load_balancing: LoadBalancingAlgorithms,
1571    /// Per-cluster override of the global per-(cluster, source-IP)
1572    /// connection limit. `None` inherits the global default,
1573    /// `Some(0)` is explicit "unlimited", `Some(n > 0)` overrides.
1574    /// Resolved against `SessionManager::effective_max_connections_per_ip`
1575    /// at admit time in `connect_to_backend`.
1576    pub max_connections_per_ip: Option<u64>,
1577}
1578
1579pub struct TcpProxy {
1580    fronts: HashMap<String, Token>,
1581    backends: Rc<RefCell<BackendMap>>,
1582    listeners: HashMap<Token, Rc<RefCell<TcpListener>>>,
1583    configs: HashMap<ClusterId, ClusterConfiguration>,
1584    registry: Registry,
1585    sessions: Rc<RefCell<SessionManager>>,
1586    pool: Rc<RefCell<Pool>>,
1587}
1588
1589impl TcpProxy {
1590    pub fn new(
1591        registry: Registry,
1592        sessions: Rc<RefCell<SessionManager>>,
1593        pool: Rc<RefCell<Pool>>,
1594        backends: Rc<RefCell<BackendMap>>,
1595    ) -> TcpProxy {
1596        TcpProxy {
1597            backends,
1598            listeners: HashMap::new(),
1599            configs: HashMap::new(),
1600            fronts: HashMap::new(),
1601            registry,
1602            sessions,
1603            pool,
1604        }
1605    }
1606
1607    pub fn add_listener(
1608        &mut self,
1609        config: TcpListenerConfig,
1610        token: Token,
1611    ) -> Result<Token, ProxyError> {
1612        match self.listeners.entry(token) {
1613            Entry::Vacant(entry) => {
1614                let tcp_listener =
1615                    TcpListener::new(config, token).map_err(ProxyError::AddListener)?;
1616                entry.insert(Rc::new(RefCell::new(tcp_listener)));
1617                Ok(token)
1618            }
1619            _ => Err(ProxyError::ListenerAlreadyPresent),
1620        }
1621    }
1622
1623    pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
1624        let len = self.listeners.len();
1625
1626        self.listeners.retain(|_, l| l.borrow().address != address);
1627        self.listeners.len() < len
1628    }
1629
1630    pub fn activate_listener(
1631        &self,
1632        addr: &SocketAddr,
1633        tcp_listener: Option<MioTcpListener>,
1634    ) -> Result<Token, ProxyError> {
1635        let listener = self
1636            .listeners
1637            .values()
1638            .find(|listener| listener.borrow().address == *addr)
1639            .ok_or(ProxyError::NoListenerFound(*addr))?;
1640
1641        listener.borrow_mut().activate(&self.registry, tcp_listener)
1642    }
1643
1644    pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1645        self.listeners
1646            .values()
1647            .filter_map(|listener| {
1648                let mut owned = listener.borrow_mut();
1649                if let Some(listener) = owned.listener.take() {
1650                    // Reset `active` so a subsequent `activate()` re-binds
1651                    // instead of short-circuiting on the stale flag.
1652                    owned.active = false;
1653                    return Some((owned.address, listener));
1654                }
1655
1656                None
1657            })
1658            .collect()
1659    }
1660
1661    pub fn give_back_listener(
1662        &mut self,
1663        address: SocketAddr,
1664    ) -> Result<(Token, MioTcpListener), ProxyError> {
1665        let listener = self
1666            .listeners
1667            .values()
1668            .find(|listener| listener.borrow().address == address)
1669            .ok_or(ProxyError::NoListenerFound(address))?;
1670
1671        let mut owned = listener.borrow_mut();
1672
1673        let taken_listener = owned
1674            .listener
1675            .take()
1676            .ok_or(ProxyError::UnactivatedListener)?;
1677
1678        // Reset `active` so a subsequent `activate()` re-binds instead of
1679        // short-circuiting on the stale flag.
1680        owned.active = false;
1681
1682        Ok((owned.token, taken_listener))
1683    }
1684
1685    /// Apply a partial-update patch to the identified TCP listener.
1686    pub fn update_listener(&mut self, patch: UpdateTcpListenerConfig) -> Result<(), ProxyError> {
1687        let address: SocketAddr = patch.address.into();
1688        let listener = self
1689            .listeners
1690            .values()
1691            .find(|l| l.borrow().address == address)
1692            .ok_or(ProxyError::NoListenerFound(address))?;
1693        listener
1694            .borrow_mut()
1695            .update_config(&patch)
1696            .map_err(|listener_error| ProxyError::ListenerActivation {
1697                address,
1698                listener_error,
1699            })
1700    }
1701
1702    pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1703        let address = front.address.into();
1704
1705        let mut listener = self
1706            .listeners
1707            .values()
1708            .find(|l| l.borrow().address == address)
1709            .ok_or(ProxyError::NoListenerFound(address))?
1710            .borrow_mut();
1711
1712        self.fronts
1713            .insert(front.cluster_id.to_string(), listener.token);
1714        listener.set_tags(address.to_string(), Some(front.tags));
1715        listener.cluster_id = Some(front.cluster_id);
1716        Ok(())
1717    }
1718
1719    pub fn remove_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1720        let address = front.address.into();
1721
1722        let mut listener = match self
1723            .listeners
1724            .values()
1725            .find(|l| l.borrow().address == address)
1726        {
1727            Some(l) => l.borrow_mut(),
1728            None => return Err(ProxyError::NoListenerFound(address)),
1729        };
1730
1731        listener.set_tags(address.to_string(), None);
1732        if let Some(cluster_id) = listener.cluster_id.take() {
1733            self.fronts.remove(&cluster_id);
1734        }
1735        Ok(())
1736    }
1737}
1738
1739impl ProxyConfiguration for TcpProxy {
1740    fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
1741        let request_type = match message.content.request_type {
1742            Some(t) => t,
1743            None => return WorkerResponse::error(message.id, "Empty request"),
1744        };
1745        match request_type {
1746            RequestType::AddTcpFrontend(front) => {
1747                if let Err(err) = self.add_tcp_front(front) {
1748                    return WorkerResponse::error(message.id, err);
1749                }
1750
1751                WorkerResponse::ok(message.id)
1752            }
1753            RequestType::RemoveTcpFrontend(front) => {
1754                if let Err(err) = self.remove_tcp_front(front) {
1755                    return WorkerResponse::error(message.id, err);
1756                }
1757
1758                WorkerResponse::ok(message.id)
1759            }
1760            RequestType::SoftStop(_) => {
1761                info!(
1762                    "{} {} processing soft shutdown",
1763                    log_module_context!(),
1764                    message.id
1765                );
1766                let listeners: HashMap<_, _> = self.listeners.drain().collect();
1767                for (_, l) in listeners.iter() {
1768                    l.borrow_mut()
1769                        .listener
1770                        .take()
1771                        .map(|mut sock| self.registry.deregister(&mut sock));
1772                }
1773                WorkerResponse::processing(message.id)
1774            }
1775            RequestType::HardStop(_) => {
1776                info!("{} {} hard shutdown", log_module_context!(), message.id);
1777                let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1778                for (_, l) in listeners.drain() {
1779                    l.borrow_mut()
1780                        .listener
1781                        .take()
1782                        .map(|mut sock| self.registry.deregister(&mut sock));
1783                }
1784                WorkerResponse::ok(message.id)
1785            }
1786            RequestType::Status(_) => {
1787                info!("{} {} status", log_module_context!(), message.id);
1788                WorkerResponse::ok(message.id)
1789            }
1790            RequestType::AddCluster(cluster) => {
1791                let config = ClusterConfiguration {
1792                    proxy_protocol: cluster
1793                        .proxy_protocol
1794                        .and_then(|n| ProxyProtocolConfig::try_from(n).ok()),
1795                    //load_balancing: cluster.load_balancing,
1796                    max_connections_per_ip: cluster.max_connections_per_ip,
1797                };
1798                self.configs.insert(cluster.cluster_id, config);
1799                WorkerResponse::ok(message.id)
1800            }
1801            RequestType::RemoveCluster(cluster_id) => {
1802                self.configs.remove(&cluster_id);
1803                WorkerResponse::ok(message.id)
1804            }
1805            RequestType::RemoveListener(remove) => {
1806                if !self.remove_listener(remove.address.into()) {
1807                    WorkerResponse::error(
1808                        message.id,
1809                        format!("no TCP listener to remove at address {:?}", remove.address),
1810                    )
1811                } else {
1812                    WorkerResponse::ok(message.id)
1813                }
1814            }
1815            command => {
1816                debug!(
1817                    "{} {} unsupported message for TCP proxy, ignoring {:?}",
1818                    log_module_context!(),
1819                    message.id,
1820                    command
1821                );
1822                WorkerResponse::error(message.id, "unsupported message")
1823            }
1824        }
1825    }
1826
1827    fn accept(&mut self, token: ListenToken) -> Result<MioTcpStream, AcceptError> {
1828        let internal_token = Token(token.0);
1829        if let Some(listener) = self.listeners.get(&internal_token) {
1830            if let Some(tcp_listener) = &listener.borrow().listener {
1831                tcp_listener
1832                    .accept()
1833                    .map(|(frontend_sock, _)| frontend_sock)
1834                    .map_err(|e| match e.kind() {
1835                        ErrorKind::WouldBlock => AcceptError::WouldBlock,
1836                        _ => {
1837                            error!("{} accept() IO error: {:?}", log_module_context!(), e);
1838                            AcceptError::IoError
1839                        }
1840                    })
1841            } else {
1842                Err(AcceptError::IoError)
1843            }
1844        } else {
1845            Err(AcceptError::IoError)
1846        }
1847    }
1848
1849    fn create_session(
1850        &mut self,
1851        mut frontend_sock: MioTcpStream,
1852        token: ListenToken,
1853        wait_time: Duration,
1854        proxy: Rc<RefCell<Self>>,
1855    ) -> Result<(), AcceptError> {
1856        let listener_token = Token(token.0);
1857
1858        let listener = self
1859            .listeners
1860            .get(&listener_token)
1861            .ok_or(AcceptError::IoError)?;
1862
1863        let owned = listener.borrow();
1864        let mut pool = self.pool.borrow_mut();
1865
1866        let (front_buffer, back_buffer) = match (pool.checkout(), pool.checkout()) {
1867            (Some(fb), Some(bb)) => (fb, bb),
1868            _ => {
1869                error!("{} could not get buffers from pool", log_module_context!());
1870                error!(
1871                    "{} Buffer capacity has been reached, stopping to accept new connections for now",
1872                    log_module_context!()
1873                );
1874                gauge!(names::accept_queue::BACKPRESSURE, 1);
1875                self.sessions.borrow_mut().can_accept = false;
1876
1877                return Err(AcceptError::BufferCapacityReached);
1878            }
1879        };
1880
1881        if owned.cluster_id.is_none() {
1882            error!(
1883                "{} listener at address {:?} has no linked cluster",
1884                log_module_context!(),
1885                owned.address
1886            );
1887            return Err(AcceptError::IoError);
1888        }
1889
1890        let proxy_protocol = self
1891            .configs
1892            .get(owned.cluster_id.as_ref().unwrap())
1893            .and_then(|c| c.proxy_protocol);
1894
1895        if let Err(e) = frontend_sock.set_nodelay(true) {
1896            error!(
1897                "{} error setting nodelay on front socket({:?}): {:?}",
1898                log_module_context!(),
1899                frontend_sock,
1900                e
1901            );
1902        }
1903
1904        let mut session_manager = self.sessions.borrow_mut();
1905        let entry = session_manager.slab.vacant_entry();
1906        let frontend_token = Token(entry.key());
1907
1908        if let Err(register_error) = self.registry.register(
1909            &mut frontend_sock,
1910            frontend_token,
1911            Interest::READABLE | Interest::WRITABLE,
1912        ) {
1913            error!(
1914                "{} error registering front socket({:?}): {:?}",
1915                log_module_context!(),
1916                frontend_sock,
1917                register_error
1918            );
1919            return Err(AcceptError::RegisterError);
1920        }
1921
1922        let session = TcpSession::new(
1923            back_buffer,
1924            None,
1925            owned.cluster_id.clone(),
1926            Duration::from_secs(owned.config.back_timeout as u64),
1927            Duration::from_secs(owned.config.connect_timeout as u64),
1928            Duration::from_secs(owned.config.front_timeout as u64),
1929            front_buffer,
1930            frontend_token,
1931            listener.clone(),
1932            proxy_protocol,
1933            proxy,
1934            frontend_sock,
1935            wait_time,
1936        );
1937        incr!(names::tcp::REQUESTS);
1938
1939        let session = Rc::new(RefCell::new(session));
1940        entry.insert(session);
1941
1942        Ok(())
1943    }
1944}
1945
1946pub mod testing {
1947    use crate::testing::*;
1948
1949    /// This is not directly used by Sōzu but is available for example and testing purposes
1950    pub fn start_tcp_worker(
1951        config: TcpListenerConfig,
1952        max_buffers: usize,
1953        buffer_size: usize,
1954        channel: ProxyChannel,
1955    ) -> anyhow::Result<()> {
1956        let address = config.address.into();
1957
1958        let ServerParts {
1959            event_loop,
1960            registry,
1961            sessions,
1962            pool,
1963            backends,
1964            client_scm_socket: _,
1965            server_scm_socket,
1966            server_config,
1967        } = prebuild_server(max_buffers, buffer_size, true)?;
1968
1969        let token = {
1970            let mut sessions = sessions.borrow_mut();
1971            let entry = sessions.slab.vacant_entry();
1972            let key = entry.key();
1973            let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1974                protocol: Protocol::TCPListen,
1975            })));
1976            Token(key)
1977        };
1978
1979        let mut proxy = TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1980        proxy
1981            .add_listener(config, token)
1982            .with_context(|| "Failed at creating adding the listener")?;
1983        proxy
1984            .activate_listener(&address, None)
1985            .with_context(|| "Failed at creating activating the listener")?;
1986
1987        let mut server = Server::new(
1988            event_loop,
1989            channel,
1990            server_scm_socket,
1991            sessions,
1992            pool,
1993            backends,
1994            None,
1995            None,
1996            Some(proxy),
1997            server_config,
1998            None,
1999            false,
2000        )
2001        .with_context(|| "Failed at creating server")?;
2002
2003        debug!("{} starting event loop", log_module_context!());
2004        server.run();
2005        debug!("{} ending event loop", log_module_context!());
2006        Ok(())
2007    }
2008}
2009
2010#[cfg(test)]
2011mod tests {
2012    use std::{
2013        io::{Read, Write},
2014        net::{Shutdown, TcpListener, TcpStream},
2015        str,
2016        sync::{
2017            Arc, Barrier,
2018            atomic::{AtomicBool, Ordering},
2019        },
2020        thread,
2021        time::Duration,
2022    };
2023
2024    use sozu_command::{
2025        channel::Channel,
2026        config::ListenerBuilder,
2027        proto::command::{
2028            LoadBalancingParams, RequestTcpFrontend, SocketAddress, SoftStop, WorkerRequest,
2029            WorkerResponse, request::RequestType,
2030        },
2031    };
2032
2033    use super::testing::start_tcp_worker;
2034    use crate::testing::*;
2035
2036    /*
2037    #[test]
2038    #[cfg(target_pointer_width = "64")]
2039    fn size_test() {
2040      assert_size!(Pipe<mio::net::TcpStream>, 224);
2041      assert_size!(SendProxyProtocol<mio::net::TcpStream>, 144);
2042      assert_size!(RelayProxyProtocol<mio::net::TcpStream>, 152);
2043      assert_size!(ExpectProxyProtocol<mio::net::TcpStream>, 520);
2044      assert_size!(State, 528);
2045      // fails depending on the platform?
2046      //assert_size!(Session, 808);
2047    }*/
2048
2049    #[test]
2050    fn round_trip() {
2051        setup_test_logger!();
2052        let barrier = Arc::new(Barrier::new(2));
2053        let test_finished = Arc::new(AtomicBool::new(false));
2054
2055        let front_port1 = provide_port();
2056        let front_port2 = provide_port();
2057
2058        let backend_port = start_server(barrier.clone(), test_finished.clone());
2059        let mut command =
2060            start_proxy(backend_port, front_port1, front_port2).expect("Could not start proxy");
2061        barrier.wait();
2062
2063        thread::scope(|_s| {
2064            let front_addr = format!("127.0.0.1:{front_port1}");
2065
2066            let mut s1 = TcpStream::connect(&front_addr).expect("could not connect");
2067            s1.set_read_timeout(Some(Duration::from_secs(5)))
2068                .expect("could not set read timeout on s1");
2069
2070            let s3 = TcpStream::connect(&front_addr).expect("could not connect");
2071
2072            let mut s2 = TcpStream::connect(&front_addr).expect("could not connect");
2073            s2.set_read_timeout(Some(Duration::from_secs(5)))
2074                .expect("could not set read timeout on s2");
2075
2076            s1.write_all(b"hello ").expect("could not write to s1");
2077            println!("s1 sent");
2078
2079            s2.write_all(b"pouet pouet").expect("could not write to s2");
2080            println!("s2 sent");
2081
2082            let mut res = [0; 128];
2083            s1.write_all(b"coucou").expect("could not write to s1");
2084
2085            s3.shutdown(Shutdown::Both).expect("could not shutdown s3");
2086
2087            let sz2 = s2
2088                .read(&mut res[..])
2089                .expect("could not read from socket s2");
2090            println!("s2 received {:?}", str::from_utf8(&res[..sz2]));
2091            assert_eq!(&res[..sz2], &b"pouet pouet"[..]);
2092
2093            // Read in a loop: a single read() on a TCP stream is not
2094            // guaranteed to return all echoed data if the second write's
2095            // round trip (client → proxy → backend → proxy → client) is
2096            // still in flight when we poll.
2097            let expected = b"hello coucou";
2098            let mut total = 0;
2099            while total < expected.len() {
2100                let sz = s1
2101                    .read(&mut res[total..])
2102                    .expect("could not read from socket s1");
2103                assert!(sz > 0, "connection closed before receiving all data");
2104                total += sz;
2105            }
2106            println!(
2107                "s1 received again({}): {:?}",
2108                total,
2109                str::from_utf8(&res[..total])
2110            );
2111            assert_eq!(&res[..total], &expected[..]);
2112
2113            // Signal the echo server to stop
2114            test_finished.store(true, Ordering::Relaxed);
2115
2116            // Send SoftStop to the sozu worker so server.run() exits cleanly
2117            command
2118                .write_message(&WorkerRequest {
2119                    id: "ID_SOFTSTOP".to_owned(),
2120                    content: RequestType::SoftStop(SoftStop {}).into(),
2121                })
2122                .expect("could not send SoftStop to sozu worker");
2123        });
2124    }
2125
2126    /// Start an echo server on an ephemeral port.
2127    /// Returns the port the server is listening on.
2128    fn start_server(barrier: Arc<Barrier>, test_finished: Arc<AtomicBool>) -> u16 {
2129        let listener =
2130            TcpListener::bind("127.0.0.1:0").expect("could not bind echo server listener");
2131        let port = listener
2132            .local_addr()
2133            .expect("could not get echo server local address")
2134            .port();
2135
2136        listener
2137            .set_nonblocking(true)
2138            .expect("could not set echo server listener to non-blocking");
2139
2140        thread::spawn(move || {
2141            barrier.wait();
2142            let mut count: u8 = 0;
2143            loop {
2144                match listener.accept() {
2145                    Ok((mut stream, _)) => {
2146                        let finished = test_finished.clone();
2147                        thread::spawn(move || {
2148                            println!("got a new client: {count}");
2149                            stream
2150                                .set_read_timeout(Some(Duration::from_secs(2)))
2151                                .expect("could not set read timeout on echo client");
2152                            let mut buf = [0; 128];
2153                            loop {
2154                                match stream.read(&mut buf[..]) {
2155                                    Ok(0) => break,
2156                                    Ok(sz) => {
2157                                        println!(
2158                                            "ECHO[{count}] got \"{:?}\"",
2159                                            str::from_utf8(&buf[..sz])
2160                                        );
2161                                        stream
2162                                            .write_all(&buf[..sz])
2163                                            .expect("could not echo data back");
2164                                    }
2165                                    Err(ref e)
2166                                        if e.kind() == std::io::ErrorKind::WouldBlock
2167                                            || e.kind() == std::io::ErrorKind::TimedOut =>
2168                                    {
2169                                        if finished.load(Ordering::Relaxed) {
2170                                            println!("backend server stopping (client handler)");
2171                                            break;
2172                                        }
2173                                    }
2174                                    Err(_) => break,
2175                                }
2176                            }
2177                        });
2178                        count = count.wrapping_add(1);
2179                    }
2180                    Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
2181                        if test_finished.load(Ordering::Relaxed) {
2182                            println!("backend server stopping (accept loop)");
2183                            break;
2184                        }
2185                        thread::sleep(Duration::from_millis(50));
2186                    }
2187                    Err(e) => {
2188                        println!("connection failed: {e:?}");
2189                    }
2190                }
2191            }
2192        });
2193
2194        port
2195    }
2196
2197    /// Start a sozu TCP proxy worker with the given backend and frontend ports.
2198    fn start_proxy(
2199        backend_port: u16,
2200        front_port1: u16,
2201        front_port2: u16,
2202    ) -> anyhow::Result<Channel<WorkerRequest, WorkerResponse>> {
2203        let config = ListenerBuilder::new_tcp(SocketAddress::new_v4(127, 0, 0, 1, front_port1))
2204            .to_tcp(None)
2205            .expect("could not create listener config");
2206
2207        let (mut command, channel) =
2208            Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
2209        let _jg = thread::spawn(move || {
2210            setup_test_logger!();
2211            start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server");
2212        });
2213
2214        command
2215            .blocking()
2216            .expect("could not set command channel to blocking");
2217        {
2218            let front = RequestTcpFrontend {
2219                cluster_id: "yolo".to_owned(),
2220                address: SocketAddress::new_v4(127, 0, 0, 1, front_port1),
2221                ..Default::default()
2222            };
2223            let backend = sozu_command_lib::response::Backend {
2224                cluster_id: "yolo".to_owned(),
2225                backend_id: "yolo-0".to_owned(),
2226                address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2227                load_balancing_parameters: Some(LoadBalancingParams::default()),
2228                sticky_id: None,
2229                backup: None,
2230            };
2231
2232            command
2233                .write_message(&WorkerRequest {
2234                    id: "ID_YOLO1".to_owned(),
2235                    content: RequestType::AddTcpFrontend(front).into(),
2236                })
2237                .expect("could not send AddTcpFrontend for front1");
2238            command
2239                .write_message(&WorkerRequest {
2240                    id: "ID_YOLO2".to_owned(),
2241                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
2242                })
2243                .expect("could not send AddBackend for front1");
2244        }
2245        {
2246            let front = RequestTcpFrontend {
2247                cluster_id: "yolo".to_owned(),
2248                address: SocketAddress::new_v4(127, 0, 0, 1, front_port2),
2249                ..Default::default()
2250            };
2251            let backend = sozu_command::response::Backend {
2252                cluster_id: "yolo".to_owned(),
2253                backend_id: "yolo-0".to_owned(),
2254                address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2255                load_balancing_parameters: Some(LoadBalancingParams::default()),
2256                sticky_id: None,
2257                backup: None,
2258            };
2259            command
2260                .write_message(&WorkerRequest {
2261                    id: "ID_YOLO3".to_owned(),
2262                    content: RequestType::AddTcpFrontend(front).into(),
2263                })
2264                .expect("could not send AddTcpFrontend for front2");
2265            command
2266                .write_message(&WorkerRequest {
2267                    id: "ID_YOLO4".to_owned(),
2268                    content: RequestType::AddBackend(backend.to_add_backend()).into(),
2269                })
2270                .expect("could not send AddBackend for front2");
2271        }
2272
2273        for _ in 0..4 {
2274            println!(
2275                "read_message: {:?}",
2276                command
2277                    .read_message()
2278                    .with_context(|| "could not read message")?
2279            );
2280        }
2281
2282        Ok(command)
2283    }
2284}