1use std::{
2 cell::RefCell,
3 collections::{BTreeMap, HashMap, hash_map::Entry},
4 io::ErrorKind,
5 net::{Shutdown, SocketAddr},
6 os::unix::io::AsRawFd,
7 rc::Rc,
8 time::{Duration, Instant},
9};
10
11use mio::{
12 Interest, Registry, Token,
13 net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream},
14 unix::SourceFd,
15};
16use rusty_ulid::Ulid;
17use sozu_command::{
18 ObjectKind,
19 config::MAX_LOOP_ITERATIONS,
20 logging::{EndpointRecord, LogContext, ansi_palette},
21 proto::command::request::RequestType,
22};
23
24use crate::metrics::names;
25use crate::{
26 AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
27 ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
28 Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
29 backends::{Backend, BackendMap},
30 pool::{Checkout, Pool},
31 protocol::{
32 Pipe,
33 pipe::WebSocketContext,
34 proxy_protocol::{
35 expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
36 },
37 },
38 retry::RetryPolicy,
39 server::{CONN_RETRIES, ListenToken, SessionManager, push_event},
40 socket::{server_bind, stats::socket_rtt},
41 sozu_command::{
42 proto::command::{
43 Event, EventKind, ProxyProtocolConfig, RequestTcpFrontend, TcpListenerConfig,
44 UpdateTcpListenerConfig, WorkerRequest, WorkerResponse,
45 },
46 ready::Ready,
47 state::ClusterId,
48 },
49 timer::TimeoutContainer,
50};
51
52StateMachineBuilder! {
53 enum TcpStateMachine {
58 Pipe(Pipe<MioTcpStream, TcpListener>),
59 SendProxyProtocol(SendProxyProtocol<MioTcpStream>),
60 RelayProxyProtocol(RelayProxyProtocol<MioTcpStream>),
61 ExpectProxyProtocol(ExpectProxyProtocol<MioTcpStream>),
62 }
63}
64
65macro_rules! log_context {
70 ($self:expr) => {{
71 let (open, reset, grey, gray, white) = ansi_palette();
72 format!(
73 "{gray}{ctx}{reset}\t{open}TCP{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset})\t >>>",
74 open = open,
75 reset = reset,
76 grey = grey,
77 gray = gray,
78 white = white,
79 ctx = $self.log_context(),
80 frontend = $self.frontend_token.0,
81 backend = $self
82 .backend_token
83 .map(|token| token.0.to_string())
84 .unwrap_or_else(|| "<none>".to_string()),
85 )
86 }};
87}
88
89macro_rules! log_module_context {
97 () => {{
98 let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
99 format!("{open}TCP{reset}\t >>>", open = open, reset = reset)
100 }};
101}
102
103pub struct TcpSession {
104 backend_buffer: Option<Checkout>,
105 backend_connected: BackendConnectionStatus,
106 backend_id: Option<String>,
107 backend_token: Option<Token>,
108 backend: Option<Rc<RefCell<Backend>>>,
109 cluster_id: Option<String>,
110 configured_backend_timeout: Duration,
111 connection_attempt: u8,
112 container_backend_timeout: TimeoutContainer,
113 container_frontend_timeout: TimeoutContainer,
114 frontend_address: Option<SocketAddr>,
115 frontend_buffer: Option<Checkout>,
116 frontend_token: Token,
117 has_been_closed: SessionIsToBeClosed,
118 last_event: Instant,
119 listener: Rc<RefCell<TcpListener>>,
120 metrics: SessionMetrics,
121 proxy: Rc<RefCell<TcpProxy>>,
122 request_id: Ulid,
123 state: TcpStateMachine,
124 cluster_ip_tracked: bool,
133}
134
135impl TcpSession {
136 #[allow(clippy::too_many_arguments)]
137 fn new(
138 backend_buffer: Checkout,
139 backend_id: Option<String>,
140 cluster_id: Option<String>,
141 configured_backend_timeout: Duration,
142 configured_connect_timeout: Duration,
143 configured_frontend_timeout: Duration,
144 frontend_buffer: Checkout,
145 frontend_token: Token,
146 listener: Rc<RefCell<TcpListener>>,
147 proxy_protocol: Option<ProxyProtocolConfig>,
148 proxy: Rc<RefCell<TcpProxy>>,
149 socket: MioTcpStream,
150 wait_time: Duration,
151 ) -> TcpSession {
152 let frontend_address = socket.peer_addr().ok();
153 let mut frontend_buffer_session = None;
154 let mut backend_buffer_session = None;
155
156 let request_id = Ulid::generate();
157
158 let container_frontend_timeout =
159 TimeoutContainer::new(configured_frontend_timeout, frontend_token);
160 let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout);
161
162 let state = match proxy_protocol {
163 Some(ProxyProtocolConfig::RelayHeader) => {
164 backend_buffer_session = Some(backend_buffer);
165 gauge_add!(names::protocol::PROXY_RELAY, 1);
166 TcpStateMachine::RelayProxyProtocol(RelayProxyProtocol::new(
167 socket,
168 frontend_token,
169 request_id,
170 None,
171 frontend_buffer,
172 ))
173 }
174 Some(ProxyProtocolConfig::ExpectHeader) => {
175 frontend_buffer_session = Some(frontend_buffer);
176 backend_buffer_session = Some(backend_buffer);
177 gauge_add!(names::protocol::PROXY_EXPECT, 1);
178 TcpStateMachine::ExpectProxyProtocol(ExpectProxyProtocol::new(
179 container_frontend_timeout.clone(),
180 socket,
181 frontend_token,
182 request_id,
183 ))
184 }
185 Some(ProxyProtocolConfig::SendHeader) => {
186 frontend_buffer_session = Some(frontend_buffer);
187 backend_buffer_session = Some(backend_buffer);
188 gauge_add!(names::protocol::PROXY_SEND, 1);
189 TcpStateMachine::SendProxyProtocol(SendProxyProtocol::new(
190 socket,
191 frontend_token,
192 request_id,
193 None,
194 ))
195 }
196 None => {
197 gauge_add!(names::protocol::TCP, 1);
198 let mut pipe = Pipe::new(
199 backend_buffer,
200 backend_id.clone(),
201 None,
202 None,
203 None,
204 None,
205 cluster_id.clone(),
206 frontend_buffer,
207 frontend_token,
208 socket,
209 listener.clone(),
210 Protocol::TCP,
211 request_id,
212 request_id,
213 frontend_address,
214 WebSocketContext::Tcp,
215 );
216 pipe.set_cluster_id(cluster_id.clone());
217 TcpStateMachine::Pipe(pipe)
218 }
219 };
220
221 let metrics = SessionMetrics::new(Some(wait_time));
222 TcpSession {
225 backend_buffer: backend_buffer_session,
226 backend_connected: BackendConnectionStatus::NotConnected,
227 backend_id,
228 backend_token: None,
229 backend: None,
230 cluster_id,
231 configured_backend_timeout,
232 connection_attempt: 0,
233 container_backend_timeout,
234 container_frontend_timeout,
235 frontend_address,
236 frontend_buffer: frontend_buffer_session,
237 frontend_token,
238 has_been_closed: false,
239 last_event: Instant::now(),
240 listener,
241 metrics,
242 proxy,
243 request_id,
244 state,
245 cluster_ip_tracked: false,
246 }
247 }
248
249 fn effective_session_address(&self) -> Option<SocketAddr> {
257 match &self.state {
258 TcpStateMachine::Pipe(pipe) => pipe.get_session_address(),
259 TcpStateMachine::ExpectProxyProtocol(epp) => {
260 epp.addresses.as_ref().and_then(|pa| pa.source())
261 }
262 TcpStateMachine::RelayProxyProtocol(rpp) => {
263 rpp.addresses.as_ref().and_then(|pa| pa.source())
264 }
265 TcpStateMachine::SendProxyProtocol(_) | TcpStateMachine::FailedUpgrade(_) => None,
266 }
267 .or(self.frontend_address)
268 }
269
270 fn log_request(&self) {
271 let listener = self.listener.borrow();
272 let context = self.log_context();
273 self.metrics.register_end_of_session(&context);
274 info_access!(
275 on_failure: { incr!(names::access_logs::UNSENT) },
276 message: None,
277 context,
278 session_address: self.frontend_address,
279 backend_address: None,
280 protocol: "TCP",
281 endpoint: EndpointRecord::Tcp,
282 tags: listener.get_tags(&listener.get_addr().to_string()),
283 client_rtt: socket_rtt(self.state.front_socket()),
284 server_rtt: None,
285 user_agent: None,
286 x_request_id: None,
287 tls_version: None,
291 tls_cipher: None,
292 tls_sni: None,
293 tls_alpn: None,
294 xff_chain: None,
295 service_time: self.metrics.service_time(),
296 response_time: self.metrics.backend_response_time(),
297 request_time: self.metrics.request_time(),
298 start_time_ns: self.metrics.start_wall_ns(),
299 bytes_in: self.metrics.bin,
300 bytes_out: self.metrics.bout,
301 otel: None,
302 );
303 }
304
305 fn front_hup(&mut self) -> SessionResult {
306 match &mut self.state {
307 TcpStateMachine::Pipe(pipe) => pipe.frontend_hup(&mut self.metrics),
308 _ => {
309 self.log_request();
310 SessionResult::Close
311 }
312 }
313 }
314
315 fn back_hup(&mut self) -> SessionResult {
316 match &mut self.state {
317 TcpStateMachine::Pipe(pipe) => pipe.backend_hup(&mut self.metrics),
318 _ => {
319 self.log_request();
320 SessionResult::Close
321 }
322 }
323 }
324
325 fn log_context(&self) -> LogContext<'_> {
326 LogContext {
327 session_id: self.request_id,
328 request_id: Some(self.request_id),
329 cluster_id: self.cluster_id.as_deref(),
330 backend_id: self.backend_id.as_deref(),
331 }
332 }
333
334 fn readable(&mut self) -> SessionResult {
335 if !self.container_frontend_timeout.reset() {
336 error!(
337 "{} Could not reset frontend timeout on readable",
338 log_context!(self)
339 );
340 }
341 if self.backend_connected == BackendConnectionStatus::Connected
342 && !self.container_backend_timeout.reset()
343 {
344 error!(
345 "{} Could not reset backend timeout on readable",
346 log_context!(self)
347 );
348 }
349 match &mut self.state {
350 TcpStateMachine::Pipe(pipe) => pipe.readable(&mut self.metrics),
351 TcpStateMachine::RelayProxyProtocol(pp) => pp.readable(&mut self.metrics),
352 TcpStateMachine::ExpectProxyProtocol(pp) => pp.readable(&mut self.metrics),
353 TcpStateMachine::SendProxyProtocol(_) => SessionResult::Continue,
354 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
355 }
356 }
357
358 fn writable(&mut self) -> SessionResult {
359 match &mut self.state {
360 TcpStateMachine::Pipe(pipe) => pipe.writable(&mut self.metrics),
361 _ => SessionResult::Continue,
362 }
363 }
364
365 fn back_readable(&mut self) -> SessionResult {
366 if !self.container_frontend_timeout.reset() {
367 error!(
368 "{} Could not reset frontend timeout on back_readable",
369 log_context!(self)
370 );
371 }
372 if !self.container_backend_timeout.reset() {
373 error!(
374 "{} Could not reset backend timeout on back_readable",
375 log_context!(self)
376 );
377 }
378
379 match &mut self.state {
380 TcpStateMachine::Pipe(pipe) => pipe.backend_readable(&mut self.metrics),
381 _ => SessionResult::Continue,
382 }
383 }
384
385 fn back_writable(&mut self) -> SessionResult {
386 match &mut self.state {
387 TcpStateMachine::Pipe(pipe) => pipe.backend_writable(&mut self.metrics),
388 TcpStateMachine::RelayProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
389 TcpStateMachine::SendProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
390 TcpStateMachine::ExpectProxyProtocol(_) => SessionResult::Continue,
391 TcpStateMachine::FailedUpgrade(_) => {
392 unreachable!()
393 }
394 }
395 }
396
397 fn back_socket_mut(&mut self) -> Option<&mut MioTcpStream> {
398 match &mut self.state {
399 TcpStateMachine::Pipe(pipe) => pipe.back_socket_mut(),
400 TcpStateMachine::SendProxyProtocol(pp) => pp.back_socket_mut(),
401 TcpStateMachine::RelayProxyProtocol(pp) => pp.back_socket_mut(),
402 TcpStateMachine::ExpectProxyProtocol(_) => None,
403 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
404 }
405 }
406
407 pub fn upgrade(&mut self) -> SessionIsToBeClosed {
408 let new_state = match self.state.take() {
409 TcpStateMachine::SendProxyProtocol(spp) => self.upgrade_send(spp),
410 TcpStateMachine::RelayProxyProtocol(rpp) => self.upgrade_relay(rpp),
411 TcpStateMachine::ExpectProxyProtocol(epp) => self.upgrade_expect(epp),
412 TcpStateMachine::Pipe(_) => None,
413 TcpStateMachine::FailedUpgrade(_) => todo!(),
414 };
415
416 match new_state {
417 Some(state) => {
418 self.state = state;
419 false
420 } None => true,
423 }
424 }
425
426 fn upgrade_send(
427 &mut self,
428 send_proxy_protocol: SendProxyProtocol<MioTcpStream>,
429 ) -> Option<TcpStateMachine> {
430 if self.backend_buffer.is_some() && self.frontend_buffer.is_some() {
431 let mut pipe = send_proxy_protocol.into_pipe(
432 self.frontend_buffer.take().unwrap(),
433 self.backend_buffer.take().unwrap(),
434 self.listener.clone(),
435 );
436
437 pipe.set_cluster_id(self.cluster_id.clone());
438 gauge_add!(names::protocol::PROXY_SEND, -1);
439 gauge_add!(names::protocol::TCP, 1);
440 return Some(TcpStateMachine::Pipe(pipe));
441 }
442
443 error!(
444 "{} Missing the frontend or backend buffer queue, we can't switch to a pipe",
445 log_context!(self)
446 );
447 None
448 }
449
450 fn upgrade_relay(&mut self, rpp: RelayProxyProtocol<MioTcpStream>) -> Option<TcpStateMachine> {
451 if self.backend_buffer.is_some() {
452 let mut pipe =
453 rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone());
454 pipe.set_cluster_id(self.cluster_id.clone());
455 gauge_add!(names::protocol::PROXY_RELAY, -1);
456 gauge_add!(names::protocol::TCP, 1);
457 return Some(TcpStateMachine::Pipe(pipe));
458 }
459
460 error!(
461 "{} Missing the backend buffer queue, we can't switch to a pipe",
462 log_context!(self)
463 );
464 None
465 }
466
467 fn upgrade_expect(
468 &mut self,
469 epp: ExpectProxyProtocol<MioTcpStream>,
470 ) -> Option<TcpStateMachine> {
471 if self.frontend_buffer.is_some() && self.backend_buffer.is_some() {
472 let mut pipe = epp.into_pipe(
473 self.frontend_buffer.take().unwrap(),
474 self.backend_buffer.take().unwrap(),
475 None,
476 None,
477 self.listener.clone(),
478 );
479
480 pipe.set_cluster_id(self.cluster_id.clone());
481 gauge_add!(names::protocol::PROXY_EXPECT, -1);
482 gauge_add!(names::protocol::TCP, 1);
483 return Some(TcpStateMachine::Pipe(pipe));
484 }
485
486 error!(
487 "{} Missing the backend buffer queue, we can't switch to a pipe",
488 log_context!(self)
489 );
490 None
491 }
492
493 fn front_readiness(&mut self) -> &mut Readiness {
494 match &mut self.state {
495 TcpStateMachine::Pipe(pipe) => &mut pipe.frontend_readiness,
496 TcpStateMachine::SendProxyProtocol(pp) => &mut pp.frontend_readiness,
497 TcpStateMachine::RelayProxyProtocol(pp) => &mut pp.frontend_readiness,
498 TcpStateMachine::ExpectProxyProtocol(pp) => &mut pp.frontend_readiness,
499 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
500 }
501 }
502
503 fn back_readiness(&mut self) -> Option<&mut Readiness> {
504 match &mut self.state {
505 TcpStateMachine::Pipe(pipe) => Some(&mut pipe.backend_readiness),
506 TcpStateMachine::SendProxyProtocol(pp) => Some(&mut pp.backend_readiness),
507 TcpStateMachine::RelayProxyProtocol(pp) => Some(&mut pp.backend_readiness),
508 TcpStateMachine::ExpectProxyProtocol(_) => None,
509 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
510 }
511 }
512
513 fn set_back_socket(&mut self, socket: MioTcpStream) {
514 match &mut self.state {
515 TcpStateMachine::Pipe(pipe) => pipe.set_back_socket(socket),
516 TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_socket(socket),
517 TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_socket(socket),
518 TcpStateMachine::ExpectProxyProtocol(_) => {
519 error!(
520 "{} We should not set the back socket for the expect proxy protocol",
521 log_context!(self)
522 );
523 panic!(
524 "{} We should not set the back socket for the expect proxy protocol",
525 log_context!(self)
526 );
527 }
528 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
529 }
530 }
531
532 fn set_back_token(&mut self, token: Token) {
533 debug_assert_ne!(
537 token, self.frontend_token,
538 "backend token must differ from the frontend token"
539 );
540 self.backend_token = Some(token);
541
542 match &mut self.state {
543 TcpStateMachine::Pipe(pipe) => pipe.set_back_token(token),
544 TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_token(token),
545 TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_token(token),
546 TcpStateMachine::ExpectProxyProtocol(_) => self.backend_token = Some(token),
547 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
548 }
549
550 debug_assert_eq!(
554 self.backend_token,
555 Some(token),
556 "set_back_token must leave the session owning the registered token"
557 );
558 }
559
560 fn set_backend_id(&mut self, id: String) {
561 self.backend_id = Some(id.clone());
562 if let TcpStateMachine::Pipe(pipe) = &mut self.state {
563 pipe.set_backend_id(Some(id));
564 }
565 }
566
567 fn back_connected(&self) -> BackendConnectionStatus {
568 self.backend_connected
569 }
570
571 fn set_back_connected(&mut self, status: BackendConnectionStatus) {
572 let last = self.backend_connected;
573 debug_assert!(
580 status != BackendConnectionStatus::Connected
581 || last != BackendConnectionStatus::Connected,
582 "set_back_connected(Connected) must not run on an already-Connected backend (gauge would double-count)"
583 );
584 self.backend_connected = status;
585
586 debug_assert_eq!(
588 self.backend_connected, status,
589 "set_back_connected must record the requested status"
590 );
591
592 if status == BackendConnectionStatus::Connected {
593 gauge_add!(names::backend::CONNECTIONS, 1);
594 gauge_add!(
595 names::backend::CONNECTIONS_PER_BACKEND,
596 1,
597 self.cluster_id.as_deref(),
598 self.metrics.backend_id.as_deref()
599 );
600
601 self.container_backend_timeout
604 .set_duration(self.configured_backend_timeout);
605 self.container_frontend_timeout.reset();
606
607 if let TcpStateMachine::SendProxyProtocol(spp) = &mut self.state {
608 spp.set_back_connected(BackendConnectionStatus::Connected);
609 }
610
611 if let Some(backend) = self.backend.as_ref() {
612 let mut backend = backend.borrow_mut();
613
614 if backend.retry_policy.is_down() {
615 incr!(
616 "backend.up",
617 self.cluster_id.as_deref(),
618 self.metrics.backend_id.as_deref()
619 );
620 gauge!(
621 names::backend::AVAILABLE,
622 1,
623 self.cluster_id.as_deref(),
624 self.metrics.backend_id.as_deref()
625 );
626 info!(
627 "{} backend server {} at {} is up",
628 log_context!(self),
629 backend.backend_id,
630 backend.address
631 );
632 push_event(Event {
633 kind: EventKind::BackendUp as i32,
634 backend_id: Some(backend.backend_id.to_owned()),
635 address: Some(backend.address.into()),
636 cluster_id: None,
637 metric_detail: None,
638 });
639 }
640
641 if let BackendConnectionStatus::Connecting(start) = last {
642 backend.set_connection_time(Instant::now() - start);
643 }
644
645 backend.failures = 0;
647 backend.retry_policy.succeed();
648 }
649 }
650 }
651
652 fn remove_backend(&mut self) {
653 if let Some(backend) = self.backend.take() {
654 (*backend.borrow_mut()).dec_connections();
655 }
656
657 self.backend_token = None;
658
659 debug_assert!(
664 self.backend.is_none(),
665 "remove_backend must release the backend handle"
666 );
667 debug_assert!(
668 self.backend_token.is_none(),
669 "remove_backend must clear the backend token"
670 );
671 }
672
673 fn fail_backend_connection(&mut self) {
674 if let Some(backend) = self.backend.as_ref() {
675 let backend = &mut *backend.borrow_mut();
676 backend.failures += 1;
677
678 let already_unavailable = backend.retry_policy.is_down();
679 backend.retry_policy.fail();
680 incr!(
681 "backend.connections.error",
682 self.cluster_id.as_deref(),
683 self.metrics.backend_id.as_deref()
684 );
685 if !already_unavailable && backend.retry_policy.is_down() {
686 error!(
687 "{} backend server {} at {} is down",
688 log_context!(self),
689 backend.backend_id,
690 backend.address
691 );
692 incr!(
693 "backend.down",
694 self.cluster_id.as_deref(),
695 self.metrics.backend_id.as_deref()
696 );
697 gauge!(
698 names::backend::AVAILABLE,
699 0,
700 self.cluster_id.as_deref(),
701 self.metrics.backend_id.as_deref()
702 );
703
704 push_event(Event {
705 kind: EventKind::BackendDown as i32,
706 backend_id: Some(backend.backend_id.to_owned()),
707 address: Some(backend.address.into()),
708 cluster_id: None,
709 metric_detail: None,
710 });
711 }
712 }
713 }
714
715 pub fn test_back_socket(&mut self) -> SessionIsToBeClosed {
716 match self.back_socket_mut() {
717 Some(ref mut s) => {
718 let mut tmp = [0u8; 1];
719 let res = s.peek(&mut tmp[..]);
720
721 match res {
722 Ok(0) => false,
724 Ok(_) => true,
725 Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
726 }
727 }
728 None => false,
729 }
730 }
731
732 pub fn cancel_timeouts(&mut self) {
733 self.container_frontend_timeout.cancel();
734 self.container_backend_timeout.cancel();
735 }
736
737 #[cfg(debug_assertions)]
745 fn check_invariants(&self) {
746 debug_assert!(
751 self.connection_attempt <= CONN_RETRIES,
752 "connection_attempt ({}) must never exceed CONN_RETRIES ({})",
753 self.connection_attempt,
754 CONN_RETRIES
755 );
756
757 if self.backend_connected == BackendConnectionStatus::Connected {
765 debug_assert!(
766 self.backend_token.is_some(),
767 "a Connected backend must own a backend token"
768 );
769 }
770
771 if self.backend.is_some() {
778 debug_assert!(
779 self.backend_token.is_some(),
780 "a live backend handle must have a backend token"
781 );
782 }
783
784 if self.has_been_closed {
787 debug_assert!(
788 self.backend.is_none(),
789 "a closed session must have released its backend handle"
790 );
791 debug_assert!(
792 !self.cluster_ip_tracked,
793 "a closed session must have untracked its (cluster, source-IP) slot"
794 );
795 }
796 }
797
798 fn ready_inner(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionResult {
799 let mut counter = 0;
800
801 let back_connected = self.back_connected();
802 if back_connected.is_connecting() {
803 if self.back_readiness().unwrap().event.is_hup() && !self.test_back_socket() {
804 debug!(
806 "{} error connecting to backend, trying again",
807 log_context!(self)
808 );
809 self.connection_attempt += 1;
810 self.fail_backend_connection();
811
812 self.close_backend();
814 let connection_result = self.connect_to_backend(session.clone());
815 if let Err(err) = &connection_result {
816 match err {
817 BackendConnectionError::MaxConnectionRetries(_) => trace!(
820 "{} Error connecting to backend: {}",
821 log_context!(self),
822 err
823 ),
824 _ => warn!(
825 "{} Error connecting to backend: {}",
826 log_context!(self),
827 err
828 ),
829 }
830 }
831
832 if let Some(state_result) = handle_connection_result(connection_result) {
833 return state_result;
834 }
835 } else if self.back_readiness().unwrap().event != Ready::EMPTY {
836 self.connection_attempt = 0;
837 self.set_back_connected(BackendConnectionStatus::Connected);
838 }
839 } else if back_connected == BackendConnectionStatus::NotConnected {
840 let connection_result = self.connect_to_backend(session.clone());
841 if let Err(err) = &connection_result {
842 match err {
843 BackendConnectionError::MaxConnectionRetries(_) => trace!(
844 "{} Error connecting to backend: {}",
845 log_context!(self),
846 err
847 ),
848 _ => warn!(
849 "{} Error connecting to backend: {}",
850 log_context!(self),
851 err
852 ),
853 }
854 }
855
856 if let Some(state_result) = handle_connection_result(connection_result) {
857 return state_result;
858 }
859 }
860
861 if self.front_readiness().event.is_hup() {
862 let session_result = self.front_hup();
863 if session_result == SessionResult::Continue {
864 self.front_readiness().event.remove(Ready::HUP);
865 }
866 return session_result;
867 }
868
869 while counter < MAX_LOOP_ITERATIONS {
870 let front_interest = self.front_readiness().interest & self.front_readiness().event;
871 let back_interest = self
872 .back_readiness()
873 .map(|r| r.interest & r.event)
874 .unwrap_or(Ready::EMPTY);
875
876 trace!(
877 "{} Frontend interest({:?}) and backend interest({:?})",
878 log_context!(self),
879 front_interest,
880 back_interest
881 );
882
883 if front_interest == Ready::EMPTY && back_interest == Ready::EMPTY {
884 break;
885 }
886
887 if self
888 .back_readiness()
889 .map(|r| r.event.is_hup())
890 .unwrap_or(false)
891 && self.front_readiness().interest.is_writable()
892 && !self.front_readiness().event.is_writable()
893 {
894 break;
895 }
896
897 if front_interest.is_readable() {
898 let session_result = self.readable();
899 if session_result != SessionResult::Continue {
900 return session_result;
901 }
902 }
903
904 if back_interest.is_writable() {
905 let session_result = self.back_writable();
906 if session_result != SessionResult::Continue {
907 return session_result;
908 }
909 }
910
911 if back_interest.is_readable() {
912 let session_result = self.back_readable();
913 if session_result != SessionResult::Continue {
914 return session_result;
915 }
916 }
917
918 if front_interest.is_writable() {
919 let session_result = self.writable();
920 if session_result != SessionResult::Continue {
921 return session_result;
922 }
923 }
924
925 if back_interest.is_hup() {
926 let session_result = self.back_hup();
927 if session_result != SessionResult::Continue {
928 return session_result;
929 }
930 }
931
932 if front_interest.is_error() {
933 error!(
934 "{} Frontend socket error, disconnecting",
935 log_context!(self)
936 );
937 self.front_readiness().interest = Ready::EMPTY;
938 if let Some(r) = self.back_readiness() {
939 r.interest = Ready::EMPTY;
940 }
941
942 return SessionResult::Close;
943 }
944
945 if back_interest.is_error() && self.back_hup() == SessionResult::Close {
946 self.front_readiness().interest = Ready::EMPTY;
947 if let Some(r) = self.back_readiness() {
948 r.interest = Ready::EMPTY;
949 }
950
951 error!("{} backend socket error, disconnecting", log_context!(self));
952 return SessionResult::Close;
953 }
954
955 counter += 1;
956 }
957
958 if counter >= MAX_LOOP_ITERATIONS {
959 error!(
960 "{} Handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
961 log_context!(self),
962 MAX_LOOP_ITERATIONS
963 );
964
965 incr!(names::tcp::INFINITE_LOOP_ERROR);
966
967 let front_interest = self.front_readiness().interest & self.front_readiness().event;
968 let back_interest = self
969 .back_readiness()
970 .map(|r| r.interest & r.event)
971 .unwrap_or(Ready::EMPTY);
972
973 let back = self.back_readiness().cloned();
974
975 error!(
976 "{} readiness: front {:?} / back {:?} | front: {:?} | back: {:?} ",
977 log_context!(self),
978 self.front_readiness(),
979 back,
980 front_interest,
981 back_interest
982 );
983
984 self.print_session();
985
986 return SessionResult::Close;
987 }
988
989 SessionResult::Continue
990 }
991
992 fn close_backend(&mut self) {
994 if let (Some(token), Some(fd)) = (
995 self.backend_token,
996 self.back_socket_mut().map(|s| s.as_raw_fd()),
997 ) {
998 let proxy = self.proxy.borrow();
999 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
1000 error!(
1001 "{} Error deregistering socket({:?}): {:?}",
1002 log_context!(self),
1003 fd,
1004 e
1005 );
1006 }
1007
1008 proxy.sessions.borrow_mut().slab.try_remove(token.0);
1009 }
1010 self.remove_backend();
1011
1012 let back_connected = self.back_connected();
1013 if back_connected != BackendConnectionStatus::NotConnected {
1014 if let Some(r) = self.back_readiness() {
1015 r.event = Ready::EMPTY;
1016 }
1017
1018 let log_context = log_context!(self);
1019 if let Some(sock) = self.back_socket_mut() {
1020 if let Err(e) = sock.shutdown(Shutdown::Both) {
1025 if e.kind() != ErrorKind::NotConnected {
1026 error!(
1027 "{} Error closing back socket({:?}): {:?}",
1028 log_context, sock, e
1029 );
1030 }
1031 }
1032 }
1033 }
1034
1035 if back_connected == BackendConnectionStatus::Connected {
1042 gauge_add!(names::backend::CONNECTIONS, -1);
1043 gauge_add!(
1044 names::backend::CONNECTIONS_PER_BACKEND,
1045 -1,
1046 self.cluster_id.as_deref(),
1047 self.metrics.backend_id.as_deref()
1048 );
1049 }
1050
1051 self.set_back_connected(BackendConnectionStatus::NotConnected);
1052
1053 debug_assert_eq!(
1057 self.backend_connected,
1058 BackendConnectionStatus::NotConnected,
1059 "close_backend must leave the backend NotConnected"
1060 );
1061 debug_assert!(
1062 self.backend_token.is_none(),
1063 "close_backend must clear the backend token"
1064 );
1065 }
1066
1067 fn connect_to_backend(
1068 &mut self,
1069 session_rc: Rc<RefCell<dyn ProxySession>>,
1070 ) -> Result<BackendConnectAction, BackendConnectionError> {
1071 debug_assert!(
1075 self.connection_attempt <= CONN_RETRIES,
1076 "connection_attempt ({}) overflowed CONN_RETRIES ({}) before the retry gate",
1077 self.connection_attempt,
1078 CONN_RETRIES
1079 );
1080
1081 let cluster_id = self
1082 .listener
1083 .borrow()
1084 .cluster_id
1085 .clone()
1086 .ok_or(BackendConnectionError::NotFound(ObjectKind::TcpCluster))?;
1087
1088 self.cluster_id = Some(cluster_id.clone());
1089
1090 if self.connection_attempt >= CONN_RETRIES {
1091 incr!(
1092 "backend.connect.retries_exhausted",
1093 self.cluster_id.as_deref(),
1094 self.metrics.backend_id.as_deref()
1095 );
1096 warn!(
1097 "{} Max connection attempt reached ({})",
1098 log_context!(self),
1099 self.connection_attempt
1100 );
1101 return Err(BackendConnectionError::MaxConnectionRetries(Some(
1102 cluster_id,
1103 )));
1104 }
1105
1106 if self.proxy.borrow().sessions.borrow().at_capacity() {
1107 return Err(BackendConnectionError::MaxSessionsMemory);
1108 }
1109
1110 let cluster_max_connections_per_ip = self
1119 .proxy
1120 .borrow()
1121 .configs
1122 .get(&cluster_id)
1123 .and_then(|c| c.max_connections_per_ip);
1124 if let Some(ip) = self.effective_session_address().map(|sa| sa.ip()) {
1125 let sessions_rc = self.proxy.borrow().sessions.clone();
1126 let at_limit = sessions_rc.borrow().cluster_ip_at_limit(
1127 self.frontend_token,
1128 &cluster_id,
1129 &ip,
1130 cluster_max_connections_per_ip,
1131 );
1132 if at_limit {
1133 debug!(
1134 "{} per-(cluster, source-IP) limit hit for cluster {} from {}",
1135 log_context!(self),
1136 cluster_id,
1137 ip
1138 );
1139 return Err(BackendConnectionError::TooManyConnectionsPerIp { cluster_id });
1140 }
1141 sessions_rc
1142 .borrow_mut()
1143 .track_cluster_ip(self.frontend_token, cluster_id.clone(), ip);
1144 self.cluster_ip_tracked = true;
1145 }
1146
1147 let (backend, mut stream) = self
1148 .proxy
1149 .borrow()
1150 .backends
1151 .borrow_mut()
1152 .backend_from_cluster_id(&cluster_id)
1153 .map_err(BackendConnectionError::Backend)?;
1154
1155 if let Err(e) = stream.set_nodelay(true) {
1156 error!(
1157 "{} Error setting nodelay on back socket({:?}): {:?}",
1158 log_context!(self),
1159 stream,
1160 e
1161 );
1162 }
1163 self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());
1164
1165 let back_token = {
1166 let proxy = self.proxy.borrow();
1167 let mut s = proxy.sessions.borrow_mut();
1168 let entry = s.slab.vacant_entry();
1169 let back_token = Token(entry.key());
1170 let _entry = entry.insert(session_rc.clone());
1171 back_token
1172 };
1173
1174 if let Err(e) = self.proxy.borrow().registry.register(
1175 &mut stream,
1176 back_token,
1177 Interest::READABLE | Interest::WRITABLE,
1178 ) {
1179 error!(
1180 "{} Error registering back socket({:?}): {:?}",
1181 log_context!(self),
1182 stream,
1183 e
1184 );
1185 }
1186
1187 self.container_backend_timeout.set(back_token);
1188
1189 self.set_back_token(back_token);
1190 self.set_back_socket(stream);
1191
1192 self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
1193 self.metrics.backend_start();
1194 self.set_backend_id(backend.borrow().backend_id.clone());
1195
1196 debug_assert!(
1201 self.backend_token.is_some(),
1202 "a New backend connection must own its backend token"
1203 );
1204 debug_assert!(
1205 self.backend_connected.is_connecting(),
1206 "a New backend connection must be in the Connecting state"
1207 );
1208
1209 Ok(BackendConnectAction::New)
1210 }
1211}
1212
1213impl ProxySession for TcpSession {
1214 fn close(&mut self) {
1215 if self.has_been_closed {
1216 return;
1217 }
1218
1219 debug_assert!(
1223 !self.has_been_closed,
1224 "close() body must only run on a not-yet-closed session"
1225 );
1226
1227 trace!("{} Closing TCP session", log_context!(self));
1229 self.metrics.service_stop();
1230
1231 if self.cluster_ip_tracked {
1237 self.proxy
1238 .borrow()
1239 .sessions
1240 .borrow_mut()
1241 .untrack_all_cluster_ip(self.frontend_token);
1242 self.cluster_ip_tracked = false;
1243 }
1244
1245 match self.state.marker() {
1247 StateMarker::Pipe => gauge_add!(names::protocol::TCP, -1),
1248 StateMarker::SendProxyProtocol => gauge_add!(names::protocol::PROXY_SEND, -1),
1249 StateMarker::RelayProxyProtocol => gauge_add!(names::protocol::PROXY_RELAY, -1),
1250 StateMarker::ExpectProxyProtocol => gauge_add!(names::protocol::PROXY_EXPECT, -1),
1251 }
1252
1253 if self.state.failed() {
1254 match self.state.marker() {
1255 StateMarker::Pipe => incr!(names::tcp::UPGRADE_PIPE_FAILED),
1256 StateMarker::SendProxyProtocol => incr!(names::tcp::UPGRADE_SEND_FAILED),
1257 StateMarker::RelayProxyProtocol => incr!(names::tcp::UPGRADE_RELAY_FAILED),
1258 StateMarker::ExpectProxyProtocol => incr!(names::tcp::UPGRADE_EXPECT_FAILED),
1259 }
1260 return;
1261 }
1262
1263 self.cancel_timeouts();
1264
1265 let front_socket = self.state.front_socket();
1266 if let Err(e) = front_socket.shutdown(Shutdown::Both) {
1272 if e.kind() != ErrorKind::NotConnected {
1274 error!(
1275 "{} Error shutting down front socket({:?}): {:?}",
1276 log_context!(self),
1277 front_socket,
1278 e
1279 );
1280 }
1281 }
1282
1283 {
1285 let proxy = self.proxy.borrow();
1286 let fd = front_socket.as_raw_fd();
1287 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
1288 error!(
1289 "{} Error deregistering front socket({:?}) while closing TCP session: {:?}",
1290 log_context!(self),
1291 fd,
1292 e
1293 );
1294 }
1295 proxy
1296 .sessions
1297 .borrow_mut()
1298 .slab
1299 .try_remove(self.frontend_token.0);
1300 }
1301
1302 self.close_backend();
1303 self.has_been_closed = true;
1304
1305 debug_assert!(self.has_been_closed, "close() must mark the session closed");
1310 debug_assert!(
1311 self.backend_token.is_none(),
1312 "close() must leave no dangling backend token"
1313 );
1314 debug_assert!(
1315 !self.cluster_ip_tracked,
1316 "close() must untrack the (cluster, source-IP) slot"
1317 );
1318 }
1319
1320 fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
1321 debug_assert!(
1326 self.backend_token != Some(self.frontend_token),
1327 "frontend and backend tokens must never collide"
1328 );
1329 if self.frontend_token == token {
1330 self.container_frontend_timeout.triggered();
1331 return true;
1332 }
1333 if self.backend_token == Some(token) {
1334 self.container_backend_timeout.triggered();
1335 return true;
1336 }
1337 false
1339 }
1340
1341 fn protocol(&self) -> Protocol {
1342 Protocol::TCP
1343 }
1344
1345 fn update_readiness(&mut self, token: Token, events: Ready) {
1346 trace!(
1347 "{} token {:?} got event {}",
1348 log_context!(self),
1349 token,
1350 super::ready_to_string(events)
1351 );
1352
1353 self.last_event = Instant::now();
1354 self.metrics.wait_start();
1355
1356 if self.frontend_token == token {
1357 self.front_readiness().event = self.front_readiness().event | events;
1358 } else if self.backend_token == Some(token) {
1359 if let Some(r) = self.back_readiness() {
1360 r.event |= events;
1361 }
1362 }
1363 }
1364
1365 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1366 self.metrics.service_start();
1367
1368 let session_result = self.ready_inner(session.clone());
1369
1370 let to_bo_closed = match session_result {
1371 SessionResult::Close => true,
1372 SessionResult::Continue => false,
1373 SessionResult::Upgrade => match self.upgrade() {
1374 false => self.ready(session),
1375 true => true,
1376 },
1377 };
1378
1379 self.metrics.service_stop();
1380
1381 #[cfg(debug_assertions)]
1386 self.check_invariants();
1387
1388 to_bo_closed
1389 }
1390
1391 fn shutting_down(&mut self) -> SessionIsToBeClosed {
1392 true
1393 }
1394
1395 fn last_event(&self) -> Instant {
1396 self.last_event
1397 }
1398
1399 fn print_session(&self) {
1400 let state: String = match &self.state {
1401 TcpStateMachine::ExpectProxyProtocol(_) => String::from("Expect"),
1402 TcpStateMachine::SendProxyProtocol(_) => String::from("Send"),
1403 TcpStateMachine::RelayProxyProtocol(_) => String::from("Relay"),
1404 TcpStateMachine::Pipe(_) => String::from("TCP"),
1405 TcpStateMachine::FailedUpgrade(marker) => format!("FailedUpgrade({marker:?})"),
1406 };
1407
1408 let front_readiness = match &self.state {
1409 TcpStateMachine::ExpectProxyProtocol(expect) => Some(&expect.frontend_readiness),
1410 TcpStateMachine::SendProxyProtocol(send) => Some(&send.frontend_readiness),
1411 TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.frontend_readiness),
1412 TcpStateMachine::Pipe(pipe) => Some(&pipe.frontend_readiness),
1413 TcpStateMachine::FailedUpgrade(_) => None,
1414 };
1415
1416 let back_readiness = match &self.state {
1417 TcpStateMachine::SendProxyProtocol(send) => Some(&send.backend_readiness),
1418 TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.backend_readiness),
1419 TcpStateMachine::Pipe(pipe) => Some(&pipe.backend_readiness),
1420 TcpStateMachine::ExpectProxyProtocol(_) => None,
1421 TcpStateMachine::FailedUpgrade(_) => None,
1422 };
1423
1424 error!(
1425 "\
1426{} Session ({:?})
1427\tFrontend:
1428\t\ttoken: {:?}\treadiness: {:?}
1429\tBackend:
1430\t\ttoken: {:?}\treadiness: {:?}\tstatus: {:?}\tcluster id: {:?}",
1431 log_context!(self),
1432 state,
1433 self.frontend_token,
1434 front_readiness,
1435 self.backend_token,
1436 back_readiness,
1437 self.backend_connected,
1438 self.cluster_id
1439 );
1440 error!("Metrics: {:?}", self.metrics);
1441 }
1442
1443 fn frontend_token(&self) -> Token {
1444 self.frontend_token
1445 }
1446}
1447
1448pub struct TcpListener {
1449 active: SessionIsToBeClosed,
1450 address: SocketAddr,
1451 cluster_id: Option<String>,
1452 config: TcpListenerConfig,
1453 listener: Option<MioTcpListener>,
1454 tags: BTreeMap<String, CachedTags>,
1455 token: Token,
1456}
1457
1458impl ListenerHandler for TcpListener {
1459 fn get_addr(&self) -> &SocketAddr {
1460 &self.address
1461 }
1462
1463 fn get_tags(&self, key: &str) -> Option<&CachedTags> {
1464 self.tags.get(key)
1465 }
1466
1467 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
1468 match tags {
1469 Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
1470 None => self.tags.remove(&key),
1471 };
1472 }
1473
1474 fn protocol(&self) -> Protocol {
1475 Protocol::TCP
1476 }
1477
1478 fn public_address(&self) -> SocketAddr {
1479 self.config
1480 .public_address
1481 .map(|addr| addr.into())
1482 .unwrap_or(self.address)
1483 }
1484}
1485
1486impl TcpListener {
1487 fn new(config: TcpListenerConfig, token: Token) -> Result<TcpListener, ListenerError> {
1488 Ok(TcpListener {
1489 cluster_id: None,
1490 listener: None,
1491 token,
1492 address: config.address.into(),
1493 config,
1494 active: false,
1495 tags: BTreeMap::new(),
1496 })
1497 }
1498
1499 pub fn activate(
1500 &mut self,
1501 registry: &Registry,
1502 tcp_listener: Option<MioTcpListener>,
1503 ) -> Result<Token, ProxyError> {
1504 if self.active {
1505 return Ok(self.token);
1506 }
1507
1508 let mut listener = match tcp_listener {
1509 Some(listener) => listener,
1510 None => {
1511 let address = self.config.address.into();
1512 server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
1513 }
1514 };
1515
1516 registry
1517 .register(&mut listener, self.token, Interest::READABLE)
1518 .map_err(ProxyError::RegisterListener)?;
1519
1520 self.listener = Some(listener);
1521 self.active = true;
1522 Ok(self.token)
1523 }
1524
1525 pub fn update_config(&mut self, patch: &UpdateTcpListenerConfig) -> Result<(), ListenerError> {
1529 if let Some(v) = patch.public_address {
1530 self.config.public_address = Some(v);
1531 }
1532 if let Some(v) = patch.expect_proxy {
1533 self.config.expect_proxy = v;
1534 }
1535 if let Some(v) = patch.front_timeout {
1536 self.config.front_timeout = v;
1537 }
1538 if let Some(v) = patch.back_timeout {
1539 self.config.back_timeout = v;
1540 }
1541 if let Some(v) = patch.connect_timeout {
1542 self.config.connect_timeout = v;
1543 }
1544 Ok(())
1545 }
1546}
1547
1548fn handle_connection_result(
1549 connection_result: Result<BackendConnectAction, BackendConnectionError>,
1550) -> Option<SessionResult> {
1551 match connection_result {
1552 Ok(BackendConnectAction::Reuse) => None,
1554 Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
1555 Some(SessionResult::Continue)
1557 }
1558 Err(_) => {
1559 Some(SessionResult::Close)
1562 }
1563 }
1564}
1565
1566#[derive(Debug)]
1567pub struct ClusterConfiguration {
1568 proxy_protocol: Option<ProxyProtocolConfig>,
1569 pub max_connections_per_ip: Option<u64>,
1577}
1578
1579pub struct TcpProxy {
1580 fronts: HashMap<String, Token>,
1581 backends: Rc<RefCell<BackendMap>>,
1582 listeners: HashMap<Token, Rc<RefCell<TcpListener>>>,
1583 configs: HashMap<ClusterId, ClusterConfiguration>,
1584 registry: Registry,
1585 sessions: Rc<RefCell<SessionManager>>,
1586 pool: Rc<RefCell<Pool>>,
1587}
1588
1589impl TcpProxy {
1590 pub fn new(
1591 registry: Registry,
1592 sessions: Rc<RefCell<SessionManager>>,
1593 pool: Rc<RefCell<Pool>>,
1594 backends: Rc<RefCell<BackendMap>>,
1595 ) -> TcpProxy {
1596 TcpProxy {
1597 backends,
1598 listeners: HashMap::new(),
1599 configs: HashMap::new(),
1600 fronts: HashMap::new(),
1601 registry,
1602 sessions,
1603 pool,
1604 }
1605 }
1606
1607 pub fn add_listener(
1608 &mut self,
1609 config: TcpListenerConfig,
1610 token: Token,
1611 ) -> Result<Token, ProxyError> {
1612 match self.listeners.entry(token) {
1613 Entry::Vacant(entry) => {
1614 let tcp_listener =
1615 TcpListener::new(config, token).map_err(ProxyError::AddListener)?;
1616 entry.insert(Rc::new(RefCell::new(tcp_listener)));
1617 Ok(token)
1618 }
1619 _ => Err(ProxyError::ListenerAlreadyPresent),
1620 }
1621 }
1622
1623 pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
1624 let len = self.listeners.len();
1625
1626 self.listeners.retain(|_, l| l.borrow().address != address);
1627 self.listeners.len() < len
1628 }
1629
1630 pub fn activate_listener(
1631 &self,
1632 addr: &SocketAddr,
1633 tcp_listener: Option<MioTcpListener>,
1634 ) -> Result<Token, ProxyError> {
1635 let listener = self
1636 .listeners
1637 .values()
1638 .find(|listener| listener.borrow().address == *addr)
1639 .ok_or(ProxyError::NoListenerFound(*addr))?;
1640
1641 listener.borrow_mut().activate(&self.registry, tcp_listener)
1642 }
1643
1644 pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1645 self.listeners
1646 .values()
1647 .filter_map(|listener| {
1648 let mut owned = listener.borrow_mut();
1649 if let Some(listener) = owned.listener.take() {
1650 owned.active = false;
1653 return Some((owned.address, listener));
1654 }
1655
1656 None
1657 })
1658 .collect()
1659 }
1660
1661 pub fn give_back_listener(
1662 &mut self,
1663 address: SocketAddr,
1664 ) -> Result<(Token, MioTcpListener), ProxyError> {
1665 let listener = self
1666 .listeners
1667 .values()
1668 .find(|listener| listener.borrow().address == address)
1669 .ok_or(ProxyError::NoListenerFound(address))?;
1670
1671 let mut owned = listener.borrow_mut();
1672
1673 let taken_listener = owned
1674 .listener
1675 .take()
1676 .ok_or(ProxyError::UnactivatedListener)?;
1677
1678 owned.active = false;
1681
1682 Ok((owned.token, taken_listener))
1683 }
1684
1685 pub fn update_listener(&mut self, patch: UpdateTcpListenerConfig) -> Result<(), ProxyError> {
1687 let address: SocketAddr = patch.address.into();
1688 let listener = self
1689 .listeners
1690 .values()
1691 .find(|l| l.borrow().address == address)
1692 .ok_or(ProxyError::NoListenerFound(address))?;
1693 listener
1694 .borrow_mut()
1695 .update_config(&patch)
1696 .map_err(|listener_error| ProxyError::ListenerActivation {
1697 address,
1698 listener_error,
1699 })
1700 }
1701
1702 pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1703 let address = front.address.into();
1704
1705 let mut listener = self
1706 .listeners
1707 .values()
1708 .find(|l| l.borrow().address == address)
1709 .ok_or(ProxyError::NoListenerFound(address))?
1710 .borrow_mut();
1711
1712 self.fronts
1713 .insert(front.cluster_id.to_string(), listener.token);
1714 listener.set_tags(address.to_string(), Some(front.tags));
1715 listener.cluster_id = Some(front.cluster_id);
1716 Ok(())
1717 }
1718
1719 pub fn remove_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1720 let address = front.address.into();
1721
1722 let mut listener = match self
1723 .listeners
1724 .values()
1725 .find(|l| l.borrow().address == address)
1726 {
1727 Some(l) => l.borrow_mut(),
1728 None => return Err(ProxyError::NoListenerFound(address)),
1729 };
1730
1731 listener.set_tags(address.to_string(), None);
1732 if let Some(cluster_id) = listener.cluster_id.take() {
1733 self.fronts.remove(&cluster_id);
1734 }
1735 Ok(())
1736 }
1737}
1738
1739impl ProxyConfiguration for TcpProxy {
1740 fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
1741 let request_type = match message.content.request_type {
1742 Some(t) => t,
1743 None => return WorkerResponse::error(message.id, "Empty request"),
1744 };
1745 match request_type {
1746 RequestType::AddTcpFrontend(front) => {
1747 if let Err(err) = self.add_tcp_front(front) {
1748 return WorkerResponse::error(message.id, err);
1749 }
1750
1751 WorkerResponse::ok(message.id)
1752 }
1753 RequestType::RemoveTcpFrontend(front) => {
1754 if let Err(err) = self.remove_tcp_front(front) {
1755 return WorkerResponse::error(message.id, err);
1756 }
1757
1758 WorkerResponse::ok(message.id)
1759 }
1760 RequestType::SoftStop(_) => {
1761 info!(
1762 "{} {} processing soft shutdown",
1763 log_module_context!(),
1764 message.id
1765 );
1766 let listeners: HashMap<_, _> = self.listeners.drain().collect();
1767 for (_, l) in listeners.iter() {
1768 l.borrow_mut()
1769 .listener
1770 .take()
1771 .map(|mut sock| self.registry.deregister(&mut sock));
1772 }
1773 WorkerResponse::processing(message.id)
1774 }
1775 RequestType::HardStop(_) => {
1776 info!("{} {} hard shutdown", log_module_context!(), message.id);
1777 let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1778 for (_, l) in listeners.drain() {
1779 l.borrow_mut()
1780 .listener
1781 .take()
1782 .map(|mut sock| self.registry.deregister(&mut sock));
1783 }
1784 WorkerResponse::ok(message.id)
1785 }
1786 RequestType::Status(_) => {
1787 info!("{} {} status", log_module_context!(), message.id);
1788 WorkerResponse::ok(message.id)
1789 }
1790 RequestType::AddCluster(cluster) => {
1791 let config = ClusterConfiguration {
1792 proxy_protocol: cluster
1793 .proxy_protocol
1794 .and_then(|n| ProxyProtocolConfig::try_from(n).ok()),
1795 max_connections_per_ip: cluster.max_connections_per_ip,
1797 };
1798 self.configs.insert(cluster.cluster_id, config);
1799 WorkerResponse::ok(message.id)
1800 }
1801 RequestType::RemoveCluster(cluster_id) => {
1802 self.configs.remove(&cluster_id);
1803 WorkerResponse::ok(message.id)
1804 }
1805 RequestType::RemoveListener(remove) => {
1806 if !self.remove_listener(remove.address.into()) {
1807 WorkerResponse::error(
1808 message.id,
1809 format!("no TCP listener to remove at address {:?}", remove.address),
1810 )
1811 } else {
1812 WorkerResponse::ok(message.id)
1813 }
1814 }
1815 command => {
1816 debug!(
1817 "{} {} unsupported message for TCP proxy, ignoring {:?}",
1818 log_module_context!(),
1819 message.id,
1820 command
1821 );
1822 WorkerResponse::error(message.id, "unsupported message")
1823 }
1824 }
1825 }
1826
1827 fn accept(&mut self, token: ListenToken) -> Result<MioTcpStream, AcceptError> {
1828 let internal_token = Token(token.0);
1829 if let Some(listener) = self.listeners.get(&internal_token) {
1830 if let Some(tcp_listener) = &listener.borrow().listener {
1831 tcp_listener
1832 .accept()
1833 .map(|(frontend_sock, _)| frontend_sock)
1834 .map_err(|e| match e.kind() {
1835 ErrorKind::WouldBlock => AcceptError::WouldBlock,
1836 _ => {
1837 error!("{} accept() IO error: {:?}", log_module_context!(), e);
1838 AcceptError::IoError
1839 }
1840 })
1841 } else {
1842 Err(AcceptError::IoError)
1843 }
1844 } else {
1845 Err(AcceptError::IoError)
1846 }
1847 }
1848
1849 fn create_session(
1850 &mut self,
1851 mut frontend_sock: MioTcpStream,
1852 token: ListenToken,
1853 wait_time: Duration,
1854 proxy: Rc<RefCell<Self>>,
1855 ) -> Result<(), AcceptError> {
1856 let listener_token = Token(token.0);
1857
1858 let listener = self
1859 .listeners
1860 .get(&listener_token)
1861 .ok_or(AcceptError::IoError)?;
1862
1863 let owned = listener.borrow();
1864 let mut pool = self.pool.borrow_mut();
1865
1866 let (front_buffer, back_buffer) = match (pool.checkout(), pool.checkout()) {
1867 (Some(fb), Some(bb)) => (fb, bb),
1868 _ => {
1869 error!("{} could not get buffers from pool", log_module_context!());
1870 error!(
1871 "{} Buffer capacity has been reached, stopping to accept new connections for now",
1872 log_module_context!()
1873 );
1874 gauge!(names::accept_queue::BACKPRESSURE, 1);
1875 self.sessions.borrow_mut().can_accept = false;
1876
1877 return Err(AcceptError::BufferCapacityReached);
1878 }
1879 };
1880
1881 if owned.cluster_id.is_none() {
1882 error!(
1883 "{} listener at address {:?} has no linked cluster",
1884 log_module_context!(),
1885 owned.address
1886 );
1887 return Err(AcceptError::IoError);
1888 }
1889
1890 let proxy_protocol = self
1891 .configs
1892 .get(owned.cluster_id.as_ref().unwrap())
1893 .and_then(|c| c.proxy_protocol);
1894
1895 if let Err(e) = frontend_sock.set_nodelay(true) {
1896 error!(
1897 "{} error setting nodelay on front socket({:?}): {:?}",
1898 log_module_context!(),
1899 frontend_sock,
1900 e
1901 );
1902 }
1903
1904 let mut session_manager = self.sessions.borrow_mut();
1905 let entry = session_manager.slab.vacant_entry();
1906 let frontend_token = Token(entry.key());
1907
1908 if let Err(register_error) = self.registry.register(
1909 &mut frontend_sock,
1910 frontend_token,
1911 Interest::READABLE | Interest::WRITABLE,
1912 ) {
1913 error!(
1914 "{} error registering front socket({:?}): {:?}",
1915 log_module_context!(),
1916 frontend_sock,
1917 register_error
1918 );
1919 return Err(AcceptError::RegisterError);
1920 }
1921
1922 let session = TcpSession::new(
1923 back_buffer,
1924 None,
1925 owned.cluster_id.clone(),
1926 Duration::from_secs(owned.config.back_timeout as u64),
1927 Duration::from_secs(owned.config.connect_timeout as u64),
1928 Duration::from_secs(owned.config.front_timeout as u64),
1929 front_buffer,
1930 frontend_token,
1931 listener.clone(),
1932 proxy_protocol,
1933 proxy,
1934 frontend_sock,
1935 wait_time,
1936 );
1937 incr!(names::tcp::REQUESTS);
1938
1939 let session = Rc::new(RefCell::new(session));
1940 entry.insert(session);
1941
1942 Ok(())
1943 }
1944}
1945
1946pub mod testing {
1947 use crate::testing::*;
1948
1949 pub fn start_tcp_worker(
1951 config: TcpListenerConfig,
1952 max_buffers: usize,
1953 buffer_size: usize,
1954 channel: ProxyChannel,
1955 ) -> anyhow::Result<()> {
1956 let address = config.address.into();
1957
1958 let ServerParts {
1959 event_loop,
1960 registry,
1961 sessions,
1962 pool,
1963 backends,
1964 client_scm_socket: _,
1965 server_scm_socket,
1966 server_config,
1967 } = prebuild_server(max_buffers, buffer_size, true)?;
1968
1969 let token = {
1970 let mut sessions = sessions.borrow_mut();
1971 let entry = sessions.slab.vacant_entry();
1972 let key = entry.key();
1973 let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1974 protocol: Protocol::TCPListen,
1975 })));
1976 Token(key)
1977 };
1978
1979 let mut proxy = TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1980 proxy
1981 .add_listener(config, token)
1982 .with_context(|| "Failed at creating adding the listener")?;
1983 proxy
1984 .activate_listener(&address, None)
1985 .with_context(|| "Failed at creating activating the listener")?;
1986
1987 let mut server = Server::new(
1988 event_loop,
1989 channel,
1990 server_scm_socket,
1991 sessions,
1992 pool,
1993 backends,
1994 None,
1995 None,
1996 Some(proxy),
1997 server_config,
1998 None,
1999 false,
2000 )
2001 .with_context(|| "Failed at creating server")?;
2002
2003 debug!("{} starting event loop", log_module_context!());
2004 server.run();
2005 debug!("{} ending event loop", log_module_context!());
2006 Ok(())
2007 }
2008}
2009
2010#[cfg(test)]
2011mod tests {
2012 use std::{
2013 io::{Read, Write},
2014 net::{Shutdown, TcpListener, TcpStream},
2015 str,
2016 sync::{
2017 Arc, Barrier,
2018 atomic::{AtomicBool, Ordering},
2019 },
2020 thread,
2021 time::Duration,
2022 };
2023
2024 use sozu_command::{
2025 channel::Channel,
2026 config::ListenerBuilder,
2027 proto::command::{
2028 LoadBalancingParams, RequestTcpFrontend, SocketAddress, SoftStop, WorkerRequest,
2029 WorkerResponse, request::RequestType,
2030 },
2031 };
2032
2033 use super::testing::start_tcp_worker;
2034 use crate::testing::*;
2035
2036 #[test]
2050 fn round_trip() {
2051 setup_test_logger!();
2052 let barrier = Arc::new(Barrier::new(2));
2053 let test_finished = Arc::new(AtomicBool::new(false));
2054
2055 let front_port1 = provide_port();
2056 let front_port2 = provide_port();
2057
2058 let backend_port = start_server(barrier.clone(), test_finished.clone());
2059 let mut command =
2060 start_proxy(backend_port, front_port1, front_port2).expect("Could not start proxy");
2061 barrier.wait();
2062
2063 thread::scope(|_s| {
2064 let front_addr = format!("127.0.0.1:{front_port1}");
2065
2066 let mut s1 = TcpStream::connect(&front_addr).expect("could not connect");
2067 s1.set_read_timeout(Some(Duration::from_secs(5)))
2068 .expect("could not set read timeout on s1");
2069
2070 let s3 = TcpStream::connect(&front_addr).expect("could not connect");
2071
2072 let mut s2 = TcpStream::connect(&front_addr).expect("could not connect");
2073 s2.set_read_timeout(Some(Duration::from_secs(5)))
2074 .expect("could not set read timeout on s2");
2075
2076 s1.write_all(b"hello ").expect("could not write to s1");
2077 println!("s1 sent");
2078
2079 s2.write_all(b"pouet pouet").expect("could not write to s2");
2080 println!("s2 sent");
2081
2082 let mut res = [0; 128];
2083 s1.write_all(b"coucou").expect("could not write to s1");
2084
2085 s3.shutdown(Shutdown::Both).expect("could not shutdown s3");
2086
2087 let sz2 = s2
2088 .read(&mut res[..])
2089 .expect("could not read from socket s2");
2090 println!("s2 received {:?}", str::from_utf8(&res[..sz2]));
2091 assert_eq!(&res[..sz2], &b"pouet pouet"[..]);
2092
2093 let expected = b"hello coucou";
2098 let mut total = 0;
2099 while total < expected.len() {
2100 let sz = s1
2101 .read(&mut res[total..])
2102 .expect("could not read from socket s1");
2103 assert!(sz > 0, "connection closed before receiving all data");
2104 total += sz;
2105 }
2106 println!(
2107 "s1 received again({}): {:?}",
2108 total,
2109 str::from_utf8(&res[..total])
2110 );
2111 assert_eq!(&res[..total], &expected[..]);
2112
2113 test_finished.store(true, Ordering::Relaxed);
2115
2116 command
2118 .write_message(&WorkerRequest {
2119 id: "ID_SOFTSTOP".to_owned(),
2120 content: RequestType::SoftStop(SoftStop {}).into(),
2121 })
2122 .expect("could not send SoftStop to sozu worker");
2123 });
2124 }
2125
2126 fn start_server(barrier: Arc<Barrier>, test_finished: Arc<AtomicBool>) -> u16 {
2129 let listener =
2130 TcpListener::bind("127.0.0.1:0").expect("could not bind echo server listener");
2131 let port = listener
2132 .local_addr()
2133 .expect("could not get echo server local address")
2134 .port();
2135
2136 listener
2137 .set_nonblocking(true)
2138 .expect("could not set echo server listener to non-blocking");
2139
2140 thread::spawn(move || {
2141 barrier.wait();
2142 let mut count: u8 = 0;
2143 loop {
2144 match listener.accept() {
2145 Ok((mut stream, _)) => {
2146 let finished = test_finished.clone();
2147 thread::spawn(move || {
2148 println!("got a new client: {count}");
2149 stream
2150 .set_read_timeout(Some(Duration::from_secs(2)))
2151 .expect("could not set read timeout on echo client");
2152 let mut buf = [0; 128];
2153 loop {
2154 match stream.read(&mut buf[..]) {
2155 Ok(0) => break,
2156 Ok(sz) => {
2157 println!(
2158 "ECHO[{count}] got \"{:?}\"",
2159 str::from_utf8(&buf[..sz])
2160 );
2161 stream
2162 .write_all(&buf[..sz])
2163 .expect("could not echo data back");
2164 }
2165 Err(ref e)
2166 if e.kind() == std::io::ErrorKind::WouldBlock
2167 || e.kind() == std::io::ErrorKind::TimedOut =>
2168 {
2169 if finished.load(Ordering::Relaxed) {
2170 println!("backend server stopping (client handler)");
2171 break;
2172 }
2173 }
2174 Err(_) => break,
2175 }
2176 }
2177 });
2178 count = count.wrapping_add(1);
2179 }
2180 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
2181 if test_finished.load(Ordering::Relaxed) {
2182 println!("backend server stopping (accept loop)");
2183 break;
2184 }
2185 thread::sleep(Duration::from_millis(50));
2186 }
2187 Err(e) => {
2188 println!("connection failed: {e:?}");
2189 }
2190 }
2191 }
2192 });
2193
2194 port
2195 }
2196
2197 fn start_proxy(
2199 backend_port: u16,
2200 front_port1: u16,
2201 front_port2: u16,
2202 ) -> anyhow::Result<Channel<WorkerRequest, WorkerResponse>> {
2203 let config = ListenerBuilder::new_tcp(SocketAddress::new_v4(127, 0, 0, 1, front_port1))
2204 .to_tcp(None)
2205 .expect("could not create listener config");
2206
2207 let (mut command, channel) =
2208 Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
2209 let _jg = thread::spawn(move || {
2210 setup_test_logger!();
2211 start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server");
2212 });
2213
2214 command
2215 .blocking()
2216 .expect("could not set command channel to blocking");
2217 {
2218 let front = RequestTcpFrontend {
2219 cluster_id: "yolo".to_owned(),
2220 address: SocketAddress::new_v4(127, 0, 0, 1, front_port1),
2221 ..Default::default()
2222 };
2223 let backend = sozu_command_lib::response::Backend {
2224 cluster_id: "yolo".to_owned(),
2225 backend_id: "yolo-0".to_owned(),
2226 address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2227 load_balancing_parameters: Some(LoadBalancingParams::default()),
2228 sticky_id: None,
2229 backup: None,
2230 };
2231
2232 command
2233 .write_message(&WorkerRequest {
2234 id: "ID_YOLO1".to_owned(),
2235 content: RequestType::AddTcpFrontend(front).into(),
2236 })
2237 .expect("could not send AddTcpFrontend for front1");
2238 command
2239 .write_message(&WorkerRequest {
2240 id: "ID_YOLO2".to_owned(),
2241 content: RequestType::AddBackend(backend.to_add_backend()).into(),
2242 })
2243 .expect("could not send AddBackend for front1");
2244 }
2245 {
2246 let front = RequestTcpFrontend {
2247 cluster_id: "yolo".to_owned(),
2248 address: SocketAddress::new_v4(127, 0, 0, 1, front_port2),
2249 ..Default::default()
2250 };
2251 let backend = sozu_command::response::Backend {
2252 cluster_id: "yolo".to_owned(),
2253 backend_id: "yolo-0".to_owned(),
2254 address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2255 load_balancing_parameters: Some(LoadBalancingParams::default()),
2256 sticky_id: None,
2257 backup: None,
2258 };
2259 command
2260 .write_message(&WorkerRequest {
2261 id: "ID_YOLO3".to_owned(),
2262 content: RequestType::AddTcpFrontend(front).into(),
2263 })
2264 .expect("could not send AddTcpFrontend for front2");
2265 command
2266 .write_message(&WorkerRequest {
2267 id: "ID_YOLO4".to_owned(),
2268 content: RequestType::AddBackend(backend.to_add_backend()).into(),
2269 })
2270 .expect("could not send AddBackend for front2");
2271 }
2272
2273 for _ in 0..4 {
2274 println!(
2275 "read_message: {:?}",
2276 command
2277 .read_message()
2278 .with_context(|| "could not read message")?
2279 );
2280 }
2281
2282 Ok(command)
2283 }
2284}