1use std::{
2 cell::RefCell,
3 collections::{BTreeMap, HashMap, hash_map::Entry},
4 io::ErrorKind,
5 net::{Shutdown, SocketAddr},
6 os::unix::io::AsRawFd,
7 rc::Rc,
8 time::{Duration, Instant},
9};
10
11use mio::{
12 Interest, Registry, Token,
13 net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream},
14 unix::SourceFd,
15};
16use rusty_ulid::Ulid;
17use sozu_command::{
18 ObjectKind,
19 config::MAX_LOOP_ITERATIONS,
20 logging::{EndpointRecord, LogContext, ansi_palette},
21 proto::command::request::RequestType,
22};
23
24use crate::metrics::names;
25use crate::{
26 AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
27 ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
28 Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
29 backends::{Backend, BackendMap},
30 pool::{Checkout, Pool},
31 protocol::{
32 Pipe,
33 pipe::WebSocketContext,
34 proxy_protocol::{
35 expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
36 },
37 },
38 retry::RetryPolicy,
39 server::{CONN_RETRIES, ListenToken, SessionManager, push_event},
40 socket::{server_bind, stats::socket_rtt},
41 sozu_command::{
42 proto::command::{
43 Event, EventKind, ProxyProtocolConfig, RequestTcpFrontend, TcpListenerConfig,
44 UpdateTcpListenerConfig, WorkerRequest, WorkerResponse,
45 },
46 ready::Ready,
47 state::ClusterId,
48 },
49 timer::TimeoutContainer,
50};
51
52StateMachineBuilder! {
53 enum TcpStateMachine {
58 Pipe(Pipe<MioTcpStream, TcpListener>),
59 SendProxyProtocol(SendProxyProtocol<MioTcpStream>),
60 RelayProxyProtocol(RelayProxyProtocol<MioTcpStream>),
61 ExpectProxyProtocol(ExpectProxyProtocol<MioTcpStream>),
62 }
63}
64
65macro_rules! log_context {
70 ($self:expr) => {{
71 let (open, reset, grey, gray, white) = ansi_palette();
72 format!(
73 "{gray}{ctx}{reset}\t{open}TCP{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset})\t >>>",
74 open = open,
75 reset = reset,
76 grey = grey,
77 gray = gray,
78 white = white,
79 ctx = $self.log_context(),
80 frontend = $self.frontend_token.0,
81 backend = $self
82 .backend_token
83 .map(|token| token.0.to_string())
84 .unwrap_or_else(|| "<none>".to_string()),
85 )
86 }};
87}
88
89macro_rules! log_module_context {
97 () => {{
98 let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
99 format!("{open}TCP{reset}\t >>>", open = open, reset = reset)
100 }};
101}
102
103pub struct TcpSession {
104 backend_buffer: Option<Checkout>,
105 backend_connected: BackendConnectionStatus,
106 backend_id: Option<String>,
107 backend_token: Option<Token>,
108 backend: Option<Rc<RefCell<Backend>>>,
109 cluster_id: Option<String>,
110 configured_backend_timeout: Duration,
111 connection_attempt: u8,
112 container_backend_timeout: TimeoutContainer,
113 container_frontend_timeout: TimeoutContainer,
114 frontend_address: Option<SocketAddr>,
115 frontend_buffer: Option<Checkout>,
116 frontend_token: Token,
117 has_been_closed: SessionIsToBeClosed,
118 last_event: Instant,
119 listener: Rc<RefCell<TcpListener>>,
120 metrics: SessionMetrics,
121 proxy: Rc<RefCell<TcpProxy>>,
122 request_id: Ulid,
123 state: TcpStateMachine,
124 cluster_ip_tracked: bool,
133}
134
135impl TcpSession {
136 #[allow(clippy::too_many_arguments)]
137 fn new(
138 backend_buffer: Checkout,
139 backend_id: Option<String>,
140 cluster_id: Option<String>,
141 configured_backend_timeout: Duration,
142 configured_connect_timeout: Duration,
143 configured_frontend_timeout: Duration,
144 frontend_buffer: Checkout,
145 frontend_token: Token,
146 listener: Rc<RefCell<TcpListener>>,
147 proxy_protocol: Option<ProxyProtocolConfig>,
148 proxy: Rc<RefCell<TcpProxy>>,
149 socket: MioTcpStream,
150 wait_time: Duration,
151 ) -> TcpSession {
152 let frontend_address = socket.peer_addr().ok();
153 let mut frontend_buffer_session = None;
154 let mut backend_buffer_session = None;
155
156 let request_id = Ulid::generate();
157
158 let container_frontend_timeout =
159 TimeoutContainer::new(configured_frontend_timeout, frontend_token);
160 let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout);
161
162 let state = match proxy_protocol {
163 Some(ProxyProtocolConfig::RelayHeader) => {
164 backend_buffer_session = Some(backend_buffer);
165 gauge_add!(names::protocol::PROXY_RELAY, 1);
166 TcpStateMachine::RelayProxyProtocol(RelayProxyProtocol::new(
167 socket,
168 frontend_token,
169 request_id,
170 None,
171 frontend_buffer,
172 ))
173 }
174 Some(ProxyProtocolConfig::ExpectHeader) => {
175 frontend_buffer_session = Some(frontend_buffer);
176 backend_buffer_session = Some(backend_buffer);
177 gauge_add!(names::protocol::PROXY_EXPECT, 1);
178 TcpStateMachine::ExpectProxyProtocol(ExpectProxyProtocol::new(
179 container_frontend_timeout.clone(),
180 socket,
181 frontend_token,
182 request_id,
183 ))
184 }
185 Some(ProxyProtocolConfig::SendHeader) => {
186 frontend_buffer_session = Some(frontend_buffer);
187 backend_buffer_session = Some(backend_buffer);
188 gauge_add!(names::protocol::PROXY_SEND, 1);
189 TcpStateMachine::SendProxyProtocol(SendProxyProtocol::new(
190 socket,
191 frontend_token,
192 request_id,
193 None,
194 ))
195 }
196 None => {
197 gauge_add!(names::protocol::TCP, 1);
198 let mut pipe = Pipe::new(
199 backend_buffer,
200 backend_id.clone(),
201 None,
202 None,
203 None,
204 None,
205 cluster_id.clone(),
206 frontend_buffer,
207 frontend_token,
208 socket,
209 listener.clone(),
210 Protocol::TCP,
211 request_id,
212 request_id,
213 frontend_address,
214 WebSocketContext::Tcp,
215 );
216 pipe.set_cluster_id(cluster_id.clone());
217 TcpStateMachine::Pipe(pipe)
218 }
219 };
220
221 let metrics = SessionMetrics::new(Some(wait_time));
222 TcpSession {
225 backend_buffer: backend_buffer_session,
226 backend_connected: BackendConnectionStatus::NotConnected,
227 backend_id,
228 backend_token: None,
229 backend: None,
230 cluster_id,
231 configured_backend_timeout,
232 connection_attempt: 0,
233 container_backend_timeout,
234 container_frontend_timeout,
235 frontend_address,
236 frontend_buffer: frontend_buffer_session,
237 frontend_token,
238 has_been_closed: false,
239 last_event: Instant::now(),
240 listener,
241 metrics,
242 proxy,
243 request_id,
244 state,
245 cluster_ip_tracked: false,
246 }
247 }
248
249 fn effective_session_address(&self) -> Option<SocketAddr> {
257 match &self.state {
258 TcpStateMachine::Pipe(pipe) => pipe.get_session_address(),
259 TcpStateMachine::ExpectProxyProtocol(epp) => {
260 epp.addresses.as_ref().and_then(|pa| pa.source())
261 }
262 TcpStateMachine::RelayProxyProtocol(rpp) => {
263 rpp.addresses.as_ref().and_then(|pa| pa.source())
264 }
265 TcpStateMachine::SendProxyProtocol(_) | TcpStateMachine::FailedUpgrade(_) => None,
266 }
267 .or(self.frontend_address)
268 }
269
270 fn log_request(&self) {
271 let listener = self.listener.borrow();
272 let context = self.log_context();
273 self.metrics.register_end_of_session(&context);
274 info_access!(
275 on_failure: { incr!(names::access_logs::UNSENT) },
276 message: None,
277 context,
278 session_address: self.frontend_address,
279 backend_address: None,
280 protocol: "TCP",
281 endpoint: EndpointRecord::Tcp,
282 tags: listener.get_tags(&listener.get_addr().to_string()),
283 client_rtt: socket_rtt(self.state.front_socket()),
284 server_rtt: None,
285 user_agent: None,
286 x_request_id: None,
287 tls_version: None,
291 tls_cipher: None,
292 tls_sni: None,
293 tls_alpn: None,
294 xff_chain: None,
295 service_time: self.metrics.service_time(),
296 response_time: self.metrics.backend_response_time(),
297 request_time: self.metrics.request_time(),
298 bytes_in: self.metrics.bin,
299 bytes_out: self.metrics.bout,
300 otel: None,
301 );
302 }
303
304 fn front_hup(&mut self) -> SessionResult {
305 match &mut self.state {
306 TcpStateMachine::Pipe(pipe) => pipe.frontend_hup(&mut self.metrics),
307 _ => {
308 self.log_request();
309 SessionResult::Close
310 }
311 }
312 }
313
314 fn back_hup(&mut self) -> SessionResult {
315 match &mut self.state {
316 TcpStateMachine::Pipe(pipe) => pipe.backend_hup(&mut self.metrics),
317 _ => {
318 self.log_request();
319 SessionResult::Close
320 }
321 }
322 }
323
324 fn log_context(&self) -> LogContext<'_> {
325 LogContext {
326 session_id: self.request_id,
327 request_id: Some(self.request_id),
328 cluster_id: self.cluster_id.as_deref(),
329 backend_id: self.backend_id.as_deref(),
330 }
331 }
332
333 fn readable(&mut self) -> SessionResult {
334 if !self.container_frontend_timeout.reset() {
335 error!(
336 "{} Could not reset frontend timeout on readable",
337 log_context!(self)
338 );
339 }
340 if self.backend_connected == BackendConnectionStatus::Connected
341 && !self.container_backend_timeout.reset()
342 {
343 error!(
344 "{} Could not reset backend timeout on readable",
345 log_context!(self)
346 );
347 }
348 match &mut self.state {
349 TcpStateMachine::Pipe(pipe) => pipe.readable(&mut self.metrics),
350 TcpStateMachine::RelayProxyProtocol(pp) => pp.readable(&mut self.metrics),
351 TcpStateMachine::ExpectProxyProtocol(pp) => pp.readable(&mut self.metrics),
352 TcpStateMachine::SendProxyProtocol(_) => SessionResult::Continue,
353 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
354 }
355 }
356
357 fn writable(&mut self) -> SessionResult {
358 match &mut self.state {
359 TcpStateMachine::Pipe(pipe) => pipe.writable(&mut self.metrics),
360 _ => SessionResult::Continue,
361 }
362 }
363
364 fn back_readable(&mut self) -> SessionResult {
365 if !self.container_frontend_timeout.reset() {
366 error!(
367 "{} Could not reset frontend timeout on back_readable",
368 log_context!(self)
369 );
370 }
371 if !self.container_backend_timeout.reset() {
372 error!(
373 "{} Could not reset backend timeout on back_readable",
374 log_context!(self)
375 );
376 }
377
378 match &mut self.state {
379 TcpStateMachine::Pipe(pipe) => pipe.backend_readable(&mut self.metrics),
380 _ => SessionResult::Continue,
381 }
382 }
383
384 fn back_writable(&mut self) -> SessionResult {
385 match &mut self.state {
386 TcpStateMachine::Pipe(pipe) => pipe.backend_writable(&mut self.metrics),
387 TcpStateMachine::RelayProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
388 TcpStateMachine::SendProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
389 TcpStateMachine::ExpectProxyProtocol(_) => SessionResult::Continue,
390 TcpStateMachine::FailedUpgrade(_) => {
391 unreachable!()
392 }
393 }
394 }
395
396 fn back_socket_mut(&mut self) -> Option<&mut MioTcpStream> {
397 match &mut self.state {
398 TcpStateMachine::Pipe(pipe) => pipe.back_socket_mut(),
399 TcpStateMachine::SendProxyProtocol(pp) => pp.back_socket_mut(),
400 TcpStateMachine::RelayProxyProtocol(pp) => pp.back_socket_mut(),
401 TcpStateMachine::ExpectProxyProtocol(_) => None,
402 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
403 }
404 }
405
406 pub fn upgrade(&mut self) -> SessionIsToBeClosed {
407 let new_state = match self.state.take() {
408 TcpStateMachine::SendProxyProtocol(spp) => self.upgrade_send(spp),
409 TcpStateMachine::RelayProxyProtocol(rpp) => self.upgrade_relay(rpp),
410 TcpStateMachine::ExpectProxyProtocol(epp) => self.upgrade_expect(epp),
411 TcpStateMachine::Pipe(_) => None,
412 TcpStateMachine::FailedUpgrade(_) => todo!(),
413 };
414
415 match new_state {
416 Some(state) => {
417 self.state = state;
418 false
419 } None => true,
422 }
423 }
424
425 fn upgrade_send(
426 &mut self,
427 send_proxy_protocol: SendProxyProtocol<MioTcpStream>,
428 ) -> Option<TcpStateMachine> {
429 if self.backend_buffer.is_some() && self.frontend_buffer.is_some() {
430 let mut pipe = send_proxy_protocol.into_pipe(
431 self.frontend_buffer.take().unwrap(),
432 self.backend_buffer.take().unwrap(),
433 self.listener.clone(),
434 );
435
436 pipe.set_cluster_id(self.cluster_id.clone());
437 gauge_add!(names::protocol::PROXY_SEND, -1);
438 gauge_add!(names::protocol::TCP, 1);
439 return Some(TcpStateMachine::Pipe(pipe));
440 }
441
442 error!(
443 "{} Missing the frontend or backend buffer queue, we can't switch to a pipe",
444 log_context!(self)
445 );
446 None
447 }
448
449 fn upgrade_relay(&mut self, rpp: RelayProxyProtocol<MioTcpStream>) -> Option<TcpStateMachine> {
450 if self.backend_buffer.is_some() {
451 let mut pipe =
452 rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone());
453 pipe.set_cluster_id(self.cluster_id.clone());
454 gauge_add!(names::protocol::PROXY_RELAY, -1);
455 gauge_add!(names::protocol::TCP, 1);
456 return Some(TcpStateMachine::Pipe(pipe));
457 }
458
459 error!(
460 "{} Missing the backend buffer queue, we can't switch to a pipe",
461 log_context!(self)
462 );
463 None
464 }
465
466 fn upgrade_expect(
467 &mut self,
468 epp: ExpectProxyProtocol<MioTcpStream>,
469 ) -> Option<TcpStateMachine> {
470 if self.frontend_buffer.is_some() && self.backend_buffer.is_some() {
471 let mut pipe = epp.into_pipe(
472 self.frontend_buffer.take().unwrap(),
473 self.backend_buffer.take().unwrap(),
474 None,
475 None,
476 self.listener.clone(),
477 );
478
479 pipe.set_cluster_id(self.cluster_id.clone());
480 gauge_add!(names::protocol::PROXY_EXPECT, -1);
481 gauge_add!(names::protocol::TCP, 1);
482 return Some(TcpStateMachine::Pipe(pipe));
483 }
484
485 error!(
486 "{} Missing the backend buffer queue, we can't switch to a pipe",
487 log_context!(self)
488 );
489 None
490 }
491
492 fn front_readiness(&mut self) -> &mut Readiness {
493 match &mut self.state {
494 TcpStateMachine::Pipe(pipe) => &mut pipe.frontend_readiness,
495 TcpStateMachine::SendProxyProtocol(pp) => &mut pp.frontend_readiness,
496 TcpStateMachine::RelayProxyProtocol(pp) => &mut pp.frontend_readiness,
497 TcpStateMachine::ExpectProxyProtocol(pp) => &mut pp.frontend_readiness,
498 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
499 }
500 }
501
502 fn back_readiness(&mut self) -> Option<&mut Readiness> {
503 match &mut self.state {
504 TcpStateMachine::Pipe(pipe) => Some(&mut pipe.backend_readiness),
505 TcpStateMachine::SendProxyProtocol(pp) => Some(&mut pp.backend_readiness),
506 TcpStateMachine::RelayProxyProtocol(pp) => Some(&mut pp.backend_readiness),
507 TcpStateMachine::ExpectProxyProtocol(_) => None,
508 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
509 }
510 }
511
512 fn set_back_socket(&mut self, socket: MioTcpStream) {
513 match &mut self.state {
514 TcpStateMachine::Pipe(pipe) => pipe.set_back_socket(socket),
515 TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_socket(socket),
516 TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_socket(socket),
517 TcpStateMachine::ExpectProxyProtocol(_) => {
518 error!(
519 "{} We should not set the back socket for the expect proxy protocol",
520 log_context!(self)
521 );
522 panic!(
523 "{} We should not set the back socket for the expect proxy protocol",
524 log_context!(self)
525 );
526 }
527 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
528 }
529 }
530
531 fn set_back_token(&mut self, token: Token) {
532 self.backend_token = Some(token);
533
534 match &mut self.state {
535 TcpStateMachine::Pipe(pipe) => pipe.set_back_token(token),
536 TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_token(token),
537 TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_token(token),
538 TcpStateMachine::ExpectProxyProtocol(_) => self.backend_token = Some(token),
539 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
540 }
541 }
542
543 fn set_backend_id(&mut self, id: String) {
544 self.backend_id = Some(id.clone());
545 if let TcpStateMachine::Pipe(pipe) = &mut self.state {
546 pipe.set_backend_id(Some(id));
547 }
548 }
549
550 fn back_connected(&self) -> BackendConnectionStatus {
551 self.backend_connected
552 }
553
554 fn set_back_connected(&mut self, status: BackendConnectionStatus) {
555 let last = self.backend_connected;
556 self.backend_connected = status;
557
558 if status == BackendConnectionStatus::Connected {
559 gauge_add!(names::backend::CONNECTIONS, 1);
560 gauge_add!(
561 names::backend::CONNECTIONS_PER_BACKEND,
562 1,
563 self.cluster_id.as_deref(),
564 self.metrics.backend_id.as_deref()
565 );
566
567 self.container_backend_timeout
570 .set_duration(self.configured_backend_timeout);
571 self.container_frontend_timeout.reset();
572
573 if let TcpStateMachine::SendProxyProtocol(spp) = &mut self.state {
574 spp.set_back_connected(BackendConnectionStatus::Connected);
575 }
576
577 if let Some(backend) = self.backend.as_ref() {
578 let mut backend = backend.borrow_mut();
579
580 if backend.retry_policy.is_down() {
581 incr!(
582 "backend.up",
583 self.cluster_id.as_deref(),
584 self.metrics.backend_id.as_deref()
585 );
586 gauge!(
587 names::backend::AVAILABLE,
588 1,
589 self.cluster_id.as_deref(),
590 self.metrics.backend_id.as_deref()
591 );
592 info!(
593 "{} backend server {} at {} is up",
594 log_context!(self),
595 backend.backend_id,
596 backend.address
597 );
598 push_event(Event {
599 kind: EventKind::BackendUp as i32,
600 backend_id: Some(backend.backend_id.to_owned()),
601 address: Some(backend.address.into()),
602 cluster_id: None,
603 metric_detail: None,
604 });
605 }
606
607 if let BackendConnectionStatus::Connecting(start) = last {
608 backend.set_connection_time(Instant::now() - start);
609 }
610
611 backend.failures = 0;
613 backend.retry_policy.succeed();
614 }
615 }
616 }
617
618 fn remove_backend(&mut self) {
619 if let Some(backend) = self.backend.take() {
620 (*backend.borrow_mut()).dec_connections();
621 }
622
623 self.backend_token = None;
624 }
625
626 fn fail_backend_connection(&mut self) {
627 if let Some(backend) = self.backend.as_ref() {
628 let backend = &mut *backend.borrow_mut();
629 backend.failures += 1;
630
631 let already_unavailable = backend.retry_policy.is_down();
632 backend.retry_policy.fail();
633 incr!(
634 "backend.connections.error",
635 self.cluster_id.as_deref(),
636 self.metrics.backend_id.as_deref()
637 );
638 if !already_unavailable && backend.retry_policy.is_down() {
639 error!(
640 "{} backend server {} at {} is down",
641 log_context!(self),
642 backend.backend_id,
643 backend.address
644 );
645 incr!(
646 "backend.down",
647 self.cluster_id.as_deref(),
648 self.metrics.backend_id.as_deref()
649 );
650 gauge!(
651 names::backend::AVAILABLE,
652 0,
653 self.cluster_id.as_deref(),
654 self.metrics.backend_id.as_deref()
655 );
656
657 push_event(Event {
658 kind: EventKind::BackendDown as i32,
659 backend_id: Some(backend.backend_id.to_owned()),
660 address: Some(backend.address.into()),
661 cluster_id: None,
662 metric_detail: None,
663 });
664 }
665 }
666 }
667
668 pub fn test_back_socket(&mut self) -> SessionIsToBeClosed {
669 match self.back_socket_mut() {
670 Some(ref mut s) => {
671 let mut tmp = [0u8; 1];
672 let res = s.peek(&mut tmp[..]);
673
674 match res {
675 Ok(0) => false,
677 Ok(_) => true,
678 Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
679 }
680 }
681 None => false,
682 }
683 }
684
685 pub fn cancel_timeouts(&mut self) {
686 self.container_frontend_timeout.cancel();
687 self.container_backend_timeout.cancel();
688 }
689
690 fn ready_inner(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionResult {
691 let mut counter = 0;
692
693 let back_connected = self.back_connected();
694 if back_connected.is_connecting() {
695 if self.back_readiness().unwrap().event.is_hup() && !self.test_back_socket() {
696 debug!(
698 "{} error connecting to backend, trying again",
699 log_context!(self)
700 );
701 self.connection_attempt += 1;
702 self.fail_backend_connection();
703
704 self.close_backend();
706 let connection_result = self.connect_to_backend(session.clone());
707 if let Err(err) = &connection_result {
708 match err {
709 BackendConnectionError::MaxConnectionRetries(_) => trace!(
712 "{} Error connecting to backend: {}",
713 log_context!(self),
714 err
715 ),
716 _ => warn!(
717 "{} Error connecting to backend: {}",
718 log_context!(self),
719 err
720 ),
721 }
722 }
723
724 if let Some(state_result) = handle_connection_result(connection_result) {
725 return state_result;
726 }
727 } else if self.back_readiness().unwrap().event != Ready::EMPTY {
728 self.connection_attempt = 0;
729 self.set_back_connected(BackendConnectionStatus::Connected);
730 }
731 } else if back_connected == BackendConnectionStatus::NotConnected {
732 let connection_result = self.connect_to_backend(session.clone());
733 if let Err(err) = &connection_result {
734 match err {
735 BackendConnectionError::MaxConnectionRetries(_) => trace!(
736 "{} Error connecting to backend: {}",
737 log_context!(self),
738 err
739 ),
740 _ => warn!(
741 "{} Error connecting to backend: {}",
742 log_context!(self),
743 err
744 ),
745 }
746 }
747
748 if let Some(state_result) = handle_connection_result(connection_result) {
749 return state_result;
750 }
751 }
752
753 if self.front_readiness().event.is_hup() {
754 let session_result = self.front_hup();
755 if session_result == SessionResult::Continue {
756 self.front_readiness().event.remove(Ready::HUP);
757 }
758 return session_result;
759 }
760
761 while counter < MAX_LOOP_ITERATIONS {
762 let front_interest = self.front_readiness().interest & self.front_readiness().event;
763 let back_interest = self
764 .back_readiness()
765 .map(|r| r.interest & r.event)
766 .unwrap_or(Ready::EMPTY);
767
768 trace!(
769 "{} Frontend interest({:?}) and backend interest({:?})",
770 log_context!(self),
771 front_interest,
772 back_interest
773 );
774
775 if front_interest == Ready::EMPTY && back_interest == Ready::EMPTY {
776 break;
777 }
778
779 if self
780 .back_readiness()
781 .map(|r| r.event.is_hup())
782 .unwrap_or(false)
783 && self.front_readiness().interest.is_writable()
784 && !self.front_readiness().event.is_writable()
785 {
786 break;
787 }
788
789 if front_interest.is_readable() {
790 let session_result = self.readable();
791 if session_result != SessionResult::Continue {
792 return session_result;
793 }
794 }
795
796 if back_interest.is_writable() {
797 let session_result = self.back_writable();
798 if session_result != SessionResult::Continue {
799 return session_result;
800 }
801 }
802
803 if back_interest.is_readable() {
804 let session_result = self.back_readable();
805 if session_result != SessionResult::Continue {
806 return session_result;
807 }
808 }
809
810 if front_interest.is_writable() {
811 let session_result = self.writable();
812 if session_result != SessionResult::Continue {
813 return session_result;
814 }
815 }
816
817 if back_interest.is_hup() {
818 let session_result = self.back_hup();
819 if session_result != SessionResult::Continue {
820 return session_result;
821 }
822 }
823
824 if front_interest.is_error() {
825 error!(
826 "{} Frontend socket error, disconnecting",
827 log_context!(self)
828 );
829 self.front_readiness().interest = Ready::EMPTY;
830 if let Some(r) = self.back_readiness() {
831 r.interest = Ready::EMPTY;
832 }
833
834 return SessionResult::Close;
835 }
836
837 if back_interest.is_error() && self.back_hup() == SessionResult::Close {
838 self.front_readiness().interest = Ready::EMPTY;
839 if let Some(r) = self.back_readiness() {
840 r.interest = Ready::EMPTY;
841 }
842
843 error!("{} backend socket error, disconnecting", log_context!(self));
844 return SessionResult::Close;
845 }
846
847 counter += 1;
848 }
849
850 if counter >= MAX_LOOP_ITERATIONS {
851 error!(
852 "{} Handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
853 log_context!(self),
854 MAX_LOOP_ITERATIONS
855 );
856
857 incr!(names::tcp::INFINITE_LOOP_ERROR);
858
859 let front_interest = self.front_readiness().interest & self.front_readiness().event;
860 let back_interest = self
861 .back_readiness()
862 .map(|r| r.interest & r.event)
863 .unwrap_or(Ready::EMPTY);
864
865 let back = self.back_readiness().cloned();
866
867 error!(
868 "{} readiness: front {:?} / back {:?} | front: {:?} | back: {:?} ",
869 log_context!(self),
870 self.front_readiness(),
871 back,
872 front_interest,
873 back_interest
874 );
875
876 self.print_session();
877
878 return SessionResult::Close;
879 }
880
881 SessionResult::Continue
882 }
883
884 fn close_backend(&mut self) {
886 if let (Some(token), Some(fd)) = (
887 self.backend_token,
888 self.back_socket_mut().map(|s| s.as_raw_fd()),
889 ) {
890 let proxy = self.proxy.borrow();
891 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
892 error!(
893 "{} Error deregistering socket({:?}): {:?}",
894 log_context!(self),
895 fd,
896 e
897 );
898 }
899
900 proxy.sessions.borrow_mut().slab.try_remove(token.0);
901 }
902 self.remove_backend();
903
904 let back_connected = self.back_connected();
905 if back_connected != BackendConnectionStatus::NotConnected {
906 if let Some(r) = self.back_readiness() {
907 r.event = Ready::EMPTY;
908 }
909
910 let log_context = log_context!(self);
911 if let Some(sock) = self.back_socket_mut() {
912 if let Err(e) = sock.shutdown(Shutdown::Both) {
917 if e.kind() != ErrorKind::NotConnected {
918 error!(
919 "{} Error closing back socket({:?}): {:?}",
920 log_context, sock, e
921 );
922 }
923 }
924 }
925 }
926
927 if back_connected == BackendConnectionStatus::Connected {
928 gauge_add!(names::backend::CONNECTIONS, -1);
929 gauge_add!(
930 names::backend::CONNECTIONS_PER_BACKEND,
931 -1,
932 self.cluster_id.as_deref(),
933 self.metrics.backend_id.as_deref()
934 );
935 }
936
937 self.set_back_connected(BackendConnectionStatus::NotConnected);
938 }
939
940 fn connect_to_backend(
941 &mut self,
942 session_rc: Rc<RefCell<dyn ProxySession>>,
943 ) -> Result<BackendConnectAction, BackendConnectionError> {
944 let cluster_id = self
945 .listener
946 .borrow()
947 .cluster_id
948 .clone()
949 .ok_or(BackendConnectionError::NotFound(ObjectKind::TcpCluster))?;
950
951 self.cluster_id = Some(cluster_id.clone());
952
953 if self.connection_attempt >= CONN_RETRIES {
954 incr!(
955 "backend.connect.retries_exhausted",
956 self.cluster_id.as_deref(),
957 self.metrics.backend_id.as_deref()
958 );
959 warn!(
960 "{} Max connection attempt reached ({})",
961 log_context!(self),
962 self.connection_attempt
963 );
964 return Err(BackendConnectionError::MaxConnectionRetries(Some(
965 cluster_id,
966 )));
967 }
968
969 if self.proxy.borrow().sessions.borrow().at_capacity() {
970 return Err(BackendConnectionError::MaxSessionsMemory);
971 }
972
973 let cluster_max_connections_per_ip = self
982 .proxy
983 .borrow()
984 .configs
985 .get(&cluster_id)
986 .and_then(|c| c.max_connections_per_ip);
987 if let Some(ip) = self.effective_session_address().map(|sa| sa.ip()) {
988 let sessions_rc = self.proxy.borrow().sessions.clone();
989 let at_limit = sessions_rc.borrow().cluster_ip_at_limit(
990 self.frontend_token,
991 &cluster_id,
992 &ip,
993 cluster_max_connections_per_ip,
994 );
995 if at_limit {
996 debug!(
997 "{} per-(cluster, source-IP) limit hit for cluster {} from {}",
998 log_context!(self),
999 cluster_id,
1000 ip
1001 );
1002 return Err(BackendConnectionError::TooManyConnectionsPerIp { cluster_id });
1003 }
1004 sessions_rc
1005 .borrow_mut()
1006 .track_cluster_ip(self.frontend_token, cluster_id.clone(), ip);
1007 self.cluster_ip_tracked = true;
1008 }
1009
1010 let (backend, mut stream) = self
1011 .proxy
1012 .borrow()
1013 .backends
1014 .borrow_mut()
1015 .backend_from_cluster_id(&cluster_id)
1016 .map_err(BackendConnectionError::Backend)?;
1017
1018 if let Err(e) = stream.set_nodelay(true) {
1019 error!(
1020 "{} Error setting nodelay on back socket({:?}): {:?}",
1021 log_context!(self),
1022 stream,
1023 e
1024 );
1025 }
1026 self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());
1027
1028 let back_token = {
1029 let proxy = self.proxy.borrow();
1030 let mut s = proxy.sessions.borrow_mut();
1031 let entry = s.slab.vacant_entry();
1032 let back_token = Token(entry.key());
1033 let _entry = entry.insert(session_rc.clone());
1034 back_token
1035 };
1036
1037 if let Err(e) = self.proxy.borrow().registry.register(
1038 &mut stream,
1039 back_token,
1040 Interest::READABLE | Interest::WRITABLE,
1041 ) {
1042 error!(
1043 "{} Error registering back socket({:?}): {:?}",
1044 log_context!(self),
1045 stream,
1046 e
1047 );
1048 }
1049
1050 self.container_backend_timeout.set(back_token);
1051
1052 self.set_back_token(back_token);
1053 self.set_back_socket(stream);
1054
1055 self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
1056 self.metrics.backend_start();
1057 self.set_backend_id(backend.borrow().backend_id.clone());
1058
1059 Ok(BackendConnectAction::New)
1060 }
1061}
1062
1063impl ProxySession for TcpSession {
1064 fn close(&mut self) {
1065 if self.has_been_closed {
1066 return;
1067 }
1068
1069 trace!("{} Closing TCP session", log_context!(self));
1071 self.metrics.service_stop();
1072
1073 if self.cluster_ip_tracked {
1079 self.proxy
1080 .borrow()
1081 .sessions
1082 .borrow_mut()
1083 .untrack_all_cluster_ip(self.frontend_token);
1084 self.cluster_ip_tracked = false;
1085 }
1086
1087 match self.state.marker() {
1089 StateMarker::Pipe => gauge_add!(names::protocol::TCP, -1),
1090 StateMarker::SendProxyProtocol => gauge_add!(names::protocol::PROXY_SEND, -1),
1091 StateMarker::RelayProxyProtocol => gauge_add!(names::protocol::PROXY_RELAY, -1),
1092 StateMarker::ExpectProxyProtocol => gauge_add!(names::protocol::PROXY_EXPECT, -1),
1093 }
1094
1095 if self.state.failed() {
1096 match self.state.marker() {
1097 StateMarker::Pipe => incr!(names::tcp::UPGRADE_PIPE_FAILED),
1098 StateMarker::SendProxyProtocol => incr!(names::tcp::UPGRADE_SEND_FAILED),
1099 StateMarker::RelayProxyProtocol => incr!(names::tcp::UPGRADE_RELAY_FAILED),
1100 StateMarker::ExpectProxyProtocol => incr!(names::tcp::UPGRADE_EXPECT_FAILED),
1101 }
1102 return;
1103 }
1104
1105 self.cancel_timeouts();
1106
1107 let front_socket = self.state.front_socket();
1108 if let Err(e) = front_socket.shutdown(Shutdown::Both) {
1114 if e.kind() != ErrorKind::NotConnected {
1116 error!(
1117 "{} Error shutting down front socket({:?}): {:?}",
1118 log_context!(self),
1119 front_socket,
1120 e
1121 );
1122 }
1123 }
1124
1125 {
1127 let proxy = self.proxy.borrow();
1128 let fd = front_socket.as_raw_fd();
1129 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
1130 error!(
1131 "{} Error deregistering front socket({:?}) while closing TCP session: {:?}",
1132 log_context!(self),
1133 fd,
1134 e
1135 );
1136 }
1137 proxy
1138 .sessions
1139 .borrow_mut()
1140 .slab
1141 .try_remove(self.frontend_token.0);
1142 }
1143
1144 self.close_backend();
1145 self.has_been_closed = true;
1146 }
1147
1148 fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
1149 if self.frontend_token == token {
1150 self.container_frontend_timeout.triggered();
1151 return true;
1152 }
1153 if self.backend_token == Some(token) {
1154 self.container_backend_timeout.triggered();
1155 return true;
1156 }
1157 false
1159 }
1160
1161 fn protocol(&self) -> Protocol {
1162 Protocol::TCP
1163 }
1164
1165 fn update_readiness(&mut self, token: Token, events: Ready) {
1166 trace!(
1167 "{} token {:?} got event {}",
1168 log_context!(self),
1169 token,
1170 super::ready_to_string(events)
1171 );
1172
1173 self.last_event = Instant::now();
1174 self.metrics.wait_start();
1175
1176 if self.frontend_token == token {
1177 self.front_readiness().event = self.front_readiness().event | events;
1178 } else if self.backend_token == Some(token) {
1179 if let Some(r) = self.back_readiness() {
1180 r.event |= events;
1181 }
1182 }
1183 }
1184
1185 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1186 self.metrics.service_start();
1187
1188 let session_result = self.ready_inner(session.clone());
1189
1190 let to_bo_closed = match session_result {
1191 SessionResult::Close => true,
1192 SessionResult::Continue => false,
1193 SessionResult::Upgrade => match self.upgrade() {
1194 false => self.ready(session),
1195 true => true,
1196 },
1197 };
1198
1199 self.metrics.service_stop();
1200 to_bo_closed
1201 }
1202
1203 fn shutting_down(&mut self) -> SessionIsToBeClosed {
1204 true
1205 }
1206
1207 fn last_event(&self) -> Instant {
1208 self.last_event
1209 }
1210
1211 fn print_session(&self) {
1212 let state: String = match &self.state {
1213 TcpStateMachine::ExpectProxyProtocol(_) => String::from("Expect"),
1214 TcpStateMachine::SendProxyProtocol(_) => String::from("Send"),
1215 TcpStateMachine::RelayProxyProtocol(_) => String::from("Relay"),
1216 TcpStateMachine::Pipe(_) => String::from("TCP"),
1217 TcpStateMachine::FailedUpgrade(marker) => format!("FailedUpgrade({marker:?})"),
1218 };
1219
1220 let front_readiness = match &self.state {
1221 TcpStateMachine::ExpectProxyProtocol(expect) => Some(&expect.frontend_readiness),
1222 TcpStateMachine::SendProxyProtocol(send) => Some(&send.frontend_readiness),
1223 TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.frontend_readiness),
1224 TcpStateMachine::Pipe(pipe) => Some(&pipe.frontend_readiness),
1225 TcpStateMachine::FailedUpgrade(_) => None,
1226 };
1227
1228 let back_readiness = match &self.state {
1229 TcpStateMachine::SendProxyProtocol(send) => Some(&send.backend_readiness),
1230 TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.backend_readiness),
1231 TcpStateMachine::Pipe(pipe) => Some(&pipe.backend_readiness),
1232 TcpStateMachine::ExpectProxyProtocol(_) => None,
1233 TcpStateMachine::FailedUpgrade(_) => None,
1234 };
1235
1236 error!(
1237 "\
1238{} Session ({:?})
1239\tFrontend:
1240\t\ttoken: {:?}\treadiness: {:?}
1241\tBackend:
1242\t\ttoken: {:?}\treadiness: {:?}\tstatus: {:?}\tcluster id: {:?}",
1243 log_context!(self),
1244 state,
1245 self.frontend_token,
1246 front_readiness,
1247 self.backend_token,
1248 back_readiness,
1249 self.backend_connected,
1250 self.cluster_id
1251 );
1252 error!("Metrics: {:?}", self.metrics);
1253 }
1254
1255 fn frontend_token(&self) -> Token {
1256 self.frontend_token
1257 }
1258}
1259
1260pub struct TcpListener {
1261 active: SessionIsToBeClosed,
1262 address: SocketAddr,
1263 cluster_id: Option<String>,
1264 config: TcpListenerConfig,
1265 listener: Option<MioTcpListener>,
1266 tags: BTreeMap<String, CachedTags>,
1267 token: Token,
1268}
1269
1270impl ListenerHandler for TcpListener {
1271 fn get_addr(&self) -> &SocketAddr {
1272 &self.address
1273 }
1274
1275 fn get_tags(&self, key: &str) -> Option<&CachedTags> {
1276 self.tags.get(key)
1277 }
1278
1279 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
1280 match tags {
1281 Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
1282 None => self.tags.remove(&key),
1283 };
1284 }
1285
1286 fn protocol(&self) -> Protocol {
1287 Protocol::TCP
1288 }
1289
1290 fn public_address(&self) -> SocketAddr {
1291 self.config
1292 .public_address
1293 .map(|addr| addr.into())
1294 .unwrap_or(self.address)
1295 }
1296}
1297
1298impl TcpListener {
1299 fn new(config: TcpListenerConfig, token: Token) -> Result<TcpListener, ListenerError> {
1300 Ok(TcpListener {
1301 cluster_id: None,
1302 listener: None,
1303 token,
1304 address: config.address.into(),
1305 config,
1306 active: false,
1307 tags: BTreeMap::new(),
1308 })
1309 }
1310
1311 pub fn activate(
1312 &mut self,
1313 registry: &Registry,
1314 tcp_listener: Option<MioTcpListener>,
1315 ) -> Result<Token, ProxyError> {
1316 if self.active {
1317 return Ok(self.token);
1318 }
1319
1320 let mut listener = match tcp_listener {
1321 Some(listener) => listener,
1322 None => {
1323 let address = self.config.address.into();
1324 server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
1325 }
1326 };
1327
1328 registry
1329 .register(&mut listener, self.token, Interest::READABLE)
1330 .map_err(ProxyError::RegisterListener)?;
1331
1332 self.listener = Some(listener);
1333 self.active = true;
1334 Ok(self.token)
1335 }
1336
1337 pub fn update_config(&mut self, patch: &UpdateTcpListenerConfig) -> Result<(), ListenerError> {
1341 if let Some(v) = patch.public_address {
1342 self.config.public_address = Some(v);
1343 }
1344 if let Some(v) = patch.expect_proxy {
1345 self.config.expect_proxy = v;
1346 }
1347 if let Some(v) = patch.front_timeout {
1348 self.config.front_timeout = v;
1349 }
1350 if let Some(v) = patch.back_timeout {
1351 self.config.back_timeout = v;
1352 }
1353 if let Some(v) = patch.connect_timeout {
1354 self.config.connect_timeout = v;
1355 }
1356 Ok(())
1357 }
1358}
1359
1360fn handle_connection_result(
1361 connection_result: Result<BackendConnectAction, BackendConnectionError>,
1362) -> Option<SessionResult> {
1363 match connection_result {
1364 Ok(BackendConnectAction::Reuse) => None,
1366 Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
1367 Some(SessionResult::Continue)
1369 }
1370 Err(_) => {
1371 Some(SessionResult::Close)
1374 }
1375 }
1376}
1377
1378#[derive(Debug)]
1379pub struct ClusterConfiguration {
1380 proxy_protocol: Option<ProxyProtocolConfig>,
1381 pub max_connections_per_ip: Option<u64>,
1389}
1390
1391pub struct TcpProxy {
1392 fronts: HashMap<String, Token>,
1393 backends: Rc<RefCell<BackendMap>>,
1394 listeners: HashMap<Token, Rc<RefCell<TcpListener>>>,
1395 configs: HashMap<ClusterId, ClusterConfiguration>,
1396 registry: Registry,
1397 sessions: Rc<RefCell<SessionManager>>,
1398 pool: Rc<RefCell<Pool>>,
1399}
1400
1401impl TcpProxy {
1402 pub fn new(
1403 registry: Registry,
1404 sessions: Rc<RefCell<SessionManager>>,
1405 pool: Rc<RefCell<Pool>>,
1406 backends: Rc<RefCell<BackendMap>>,
1407 ) -> TcpProxy {
1408 TcpProxy {
1409 backends,
1410 listeners: HashMap::new(),
1411 configs: HashMap::new(),
1412 fronts: HashMap::new(),
1413 registry,
1414 sessions,
1415 pool,
1416 }
1417 }
1418
1419 pub fn add_listener(
1420 &mut self,
1421 config: TcpListenerConfig,
1422 token: Token,
1423 ) -> Result<Token, ProxyError> {
1424 match self.listeners.entry(token) {
1425 Entry::Vacant(entry) => {
1426 let tcp_listener =
1427 TcpListener::new(config, token).map_err(ProxyError::AddListener)?;
1428 entry.insert(Rc::new(RefCell::new(tcp_listener)));
1429 Ok(token)
1430 }
1431 _ => Err(ProxyError::ListenerAlreadyPresent),
1432 }
1433 }
1434
1435 pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
1436 let len = self.listeners.len();
1437
1438 self.listeners.retain(|_, l| l.borrow().address != address);
1439 self.listeners.len() < len
1440 }
1441
1442 pub fn activate_listener(
1443 &self,
1444 addr: &SocketAddr,
1445 tcp_listener: Option<MioTcpListener>,
1446 ) -> Result<Token, ProxyError> {
1447 let listener = self
1448 .listeners
1449 .values()
1450 .find(|listener| listener.borrow().address == *addr)
1451 .ok_or(ProxyError::NoListenerFound(*addr))?;
1452
1453 listener.borrow_mut().activate(&self.registry, tcp_listener)
1454 }
1455
1456 pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1457 self.listeners
1458 .values()
1459 .filter_map(|listener| {
1460 let mut owned = listener.borrow_mut();
1461 if let Some(listener) = owned.listener.take() {
1462 owned.active = false;
1465 return Some((owned.address, listener));
1466 }
1467
1468 None
1469 })
1470 .collect()
1471 }
1472
1473 pub fn give_back_listener(
1474 &mut self,
1475 address: SocketAddr,
1476 ) -> Result<(Token, MioTcpListener), ProxyError> {
1477 let listener = self
1478 .listeners
1479 .values()
1480 .find(|listener| listener.borrow().address == address)
1481 .ok_or(ProxyError::NoListenerFound(address))?;
1482
1483 let mut owned = listener.borrow_mut();
1484
1485 let taken_listener = owned
1486 .listener
1487 .take()
1488 .ok_or(ProxyError::UnactivatedListener)?;
1489
1490 owned.active = false;
1493
1494 Ok((owned.token, taken_listener))
1495 }
1496
1497 pub fn update_listener(&mut self, patch: UpdateTcpListenerConfig) -> Result<(), ProxyError> {
1499 let address: SocketAddr = patch.address.into();
1500 let listener = self
1501 .listeners
1502 .values()
1503 .find(|l| l.borrow().address == address)
1504 .ok_or(ProxyError::NoListenerFound(address))?;
1505 listener
1506 .borrow_mut()
1507 .update_config(&patch)
1508 .map_err(|listener_error| ProxyError::ListenerActivation {
1509 address,
1510 listener_error,
1511 })
1512 }
1513
1514 pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1515 let address = front.address.into();
1516
1517 let mut listener = self
1518 .listeners
1519 .values()
1520 .find(|l| l.borrow().address == address)
1521 .ok_or(ProxyError::NoListenerFound(address))?
1522 .borrow_mut();
1523
1524 self.fronts
1525 .insert(front.cluster_id.to_string(), listener.token);
1526 listener.set_tags(address.to_string(), Some(front.tags));
1527 listener.cluster_id = Some(front.cluster_id);
1528 Ok(())
1529 }
1530
1531 pub fn remove_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1532 let address = front.address.into();
1533
1534 let mut listener = match self
1535 .listeners
1536 .values()
1537 .find(|l| l.borrow().address == address)
1538 {
1539 Some(l) => l.borrow_mut(),
1540 None => return Err(ProxyError::NoListenerFound(address)),
1541 };
1542
1543 listener.set_tags(address.to_string(), None);
1544 if let Some(cluster_id) = listener.cluster_id.take() {
1545 self.fronts.remove(&cluster_id);
1546 }
1547 Ok(())
1548 }
1549}
1550
1551impl ProxyConfiguration for TcpProxy {
1552 fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
1553 let request_type = match message.content.request_type {
1554 Some(t) => t,
1555 None => return WorkerResponse::error(message.id, "Empty request"),
1556 };
1557 match request_type {
1558 RequestType::AddTcpFrontend(front) => {
1559 if let Err(err) = self.add_tcp_front(front) {
1560 return WorkerResponse::error(message.id, err);
1561 }
1562
1563 WorkerResponse::ok(message.id)
1564 }
1565 RequestType::RemoveTcpFrontend(front) => {
1566 if let Err(err) = self.remove_tcp_front(front) {
1567 return WorkerResponse::error(message.id, err);
1568 }
1569
1570 WorkerResponse::ok(message.id)
1571 }
1572 RequestType::SoftStop(_) => {
1573 info!(
1574 "{} {} processing soft shutdown",
1575 log_module_context!(),
1576 message.id
1577 );
1578 let listeners: HashMap<_, _> = self.listeners.drain().collect();
1579 for (_, l) in listeners.iter() {
1580 l.borrow_mut()
1581 .listener
1582 .take()
1583 .map(|mut sock| self.registry.deregister(&mut sock));
1584 }
1585 WorkerResponse::processing(message.id)
1586 }
1587 RequestType::HardStop(_) => {
1588 info!("{} {} hard shutdown", log_module_context!(), message.id);
1589 let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1590 for (_, l) in listeners.drain() {
1591 l.borrow_mut()
1592 .listener
1593 .take()
1594 .map(|mut sock| self.registry.deregister(&mut sock));
1595 }
1596 WorkerResponse::ok(message.id)
1597 }
1598 RequestType::Status(_) => {
1599 info!("{} {} status", log_module_context!(), message.id);
1600 WorkerResponse::ok(message.id)
1601 }
1602 RequestType::AddCluster(cluster) => {
1603 let config = ClusterConfiguration {
1604 proxy_protocol: cluster
1605 .proxy_protocol
1606 .and_then(|n| ProxyProtocolConfig::try_from(n).ok()),
1607 max_connections_per_ip: cluster.max_connections_per_ip,
1609 };
1610 self.configs.insert(cluster.cluster_id, config);
1611 WorkerResponse::ok(message.id)
1612 }
1613 RequestType::RemoveCluster(cluster_id) => {
1614 self.configs.remove(&cluster_id);
1615 WorkerResponse::ok(message.id)
1616 }
1617 RequestType::RemoveListener(remove) => {
1618 if !self.remove_listener(remove.address.into()) {
1619 WorkerResponse::error(
1620 message.id,
1621 format!("no TCP listener to remove at address {:?}", remove.address),
1622 )
1623 } else {
1624 WorkerResponse::ok(message.id)
1625 }
1626 }
1627 command => {
1628 debug!(
1629 "{} {} unsupported message for TCP proxy, ignoring {:?}",
1630 log_module_context!(),
1631 message.id,
1632 command
1633 );
1634 WorkerResponse::error(message.id, "unsupported message")
1635 }
1636 }
1637 }
1638
1639 fn accept(&mut self, token: ListenToken) -> Result<MioTcpStream, AcceptError> {
1640 let internal_token = Token(token.0);
1641 if let Some(listener) = self.listeners.get(&internal_token) {
1642 if let Some(tcp_listener) = &listener.borrow().listener {
1643 tcp_listener
1644 .accept()
1645 .map(|(frontend_sock, _)| frontend_sock)
1646 .map_err(|e| match e.kind() {
1647 ErrorKind::WouldBlock => AcceptError::WouldBlock,
1648 _ => {
1649 error!("{} accept() IO error: {:?}", log_module_context!(), e);
1650 AcceptError::IoError
1651 }
1652 })
1653 } else {
1654 Err(AcceptError::IoError)
1655 }
1656 } else {
1657 Err(AcceptError::IoError)
1658 }
1659 }
1660
1661 fn create_session(
1662 &mut self,
1663 mut frontend_sock: MioTcpStream,
1664 token: ListenToken,
1665 wait_time: Duration,
1666 proxy: Rc<RefCell<Self>>,
1667 ) -> Result<(), AcceptError> {
1668 let listener_token = Token(token.0);
1669
1670 let listener = self
1671 .listeners
1672 .get(&listener_token)
1673 .ok_or(AcceptError::IoError)?;
1674
1675 let owned = listener.borrow();
1676 let mut pool = self.pool.borrow_mut();
1677
1678 let (front_buffer, back_buffer) = match (pool.checkout(), pool.checkout()) {
1679 (Some(fb), Some(bb)) => (fb, bb),
1680 _ => {
1681 error!("{} could not get buffers from pool", log_module_context!());
1682 error!(
1683 "{} Buffer capacity has been reached, stopping to accept new connections for now",
1684 log_module_context!()
1685 );
1686 gauge!(names::accept_queue::BACKPRESSURE, 1);
1687 self.sessions.borrow_mut().can_accept = false;
1688
1689 return Err(AcceptError::BufferCapacityReached);
1690 }
1691 };
1692
1693 if owned.cluster_id.is_none() {
1694 error!(
1695 "{} listener at address {:?} has no linked cluster",
1696 log_module_context!(),
1697 owned.address
1698 );
1699 return Err(AcceptError::IoError);
1700 }
1701
1702 let proxy_protocol = self
1703 .configs
1704 .get(owned.cluster_id.as_ref().unwrap())
1705 .and_then(|c| c.proxy_protocol);
1706
1707 if let Err(e) = frontend_sock.set_nodelay(true) {
1708 error!(
1709 "{} error setting nodelay on front socket({:?}): {:?}",
1710 log_module_context!(),
1711 frontend_sock,
1712 e
1713 );
1714 }
1715
1716 let mut session_manager = self.sessions.borrow_mut();
1717 let entry = session_manager.slab.vacant_entry();
1718 let frontend_token = Token(entry.key());
1719
1720 if let Err(register_error) = self.registry.register(
1721 &mut frontend_sock,
1722 frontend_token,
1723 Interest::READABLE | Interest::WRITABLE,
1724 ) {
1725 error!(
1726 "{} error registering front socket({:?}): {:?}",
1727 log_module_context!(),
1728 frontend_sock,
1729 register_error
1730 );
1731 return Err(AcceptError::RegisterError);
1732 }
1733
1734 let session = TcpSession::new(
1735 back_buffer,
1736 None,
1737 owned.cluster_id.clone(),
1738 Duration::from_secs(owned.config.back_timeout as u64),
1739 Duration::from_secs(owned.config.connect_timeout as u64),
1740 Duration::from_secs(owned.config.front_timeout as u64),
1741 front_buffer,
1742 frontend_token,
1743 listener.clone(),
1744 proxy_protocol,
1745 proxy,
1746 frontend_sock,
1747 wait_time,
1748 );
1749 incr!(names::tcp::REQUESTS);
1750
1751 let session = Rc::new(RefCell::new(session));
1752 entry.insert(session);
1753
1754 Ok(())
1755 }
1756}
1757
1758pub mod testing {
1759 use crate::testing::*;
1760
1761 pub fn start_tcp_worker(
1763 config: TcpListenerConfig,
1764 max_buffers: usize,
1765 buffer_size: usize,
1766 channel: ProxyChannel,
1767 ) -> anyhow::Result<()> {
1768 let address = config.address.into();
1769
1770 let ServerParts {
1771 event_loop,
1772 registry,
1773 sessions,
1774 pool,
1775 backends,
1776 client_scm_socket: _,
1777 server_scm_socket,
1778 server_config,
1779 } = prebuild_server(max_buffers, buffer_size, true)?;
1780
1781 let token = {
1782 let mut sessions = sessions.borrow_mut();
1783 let entry = sessions.slab.vacant_entry();
1784 let key = entry.key();
1785 let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1786 protocol: Protocol::TCPListen,
1787 })));
1788 Token(key)
1789 };
1790
1791 let mut proxy = TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1792 proxy
1793 .add_listener(config, token)
1794 .with_context(|| "Failed at creating adding the listener")?;
1795 proxy
1796 .activate_listener(&address, None)
1797 .with_context(|| "Failed at creating activating the listener")?;
1798
1799 let mut server = Server::new(
1800 event_loop,
1801 channel,
1802 server_scm_socket,
1803 sessions,
1804 pool,
1805 backends,
1806 None,
1807 None,
1808 Some(proxy),
1809 server_config,
1810 None,
1811 false,
1812 )
1813 .with_context(|| "Failed at creating server")?;
1814
1815 debug!("{} starting event loop", log_module_context!());
1816 server.run();
1817 debug!("{} ending event loop", log_module_context!());
1818 Ok(())
1819 }
1820}
1821
1822#[cfg(test)]
1823mod tests {
1824 use std::{
1825 io::{Read, Write},
1826 net::{Shutdown, TcpListener, TcpStream},
1827 str,
1828 sync::{
1829 Arc, Barrier,
1830 atomic::{AtomicBool, Ordering},
1831 },
1832 thread,
1833 time::Duration,
1834 };
1835
1836 use sozu_command::{
1837 channel::Channel,
1838 config::ListenerBuilder,
1839 proto::command::{
1840 LoadBalancingParams, RequestTcpFrontend, SocketAddress, SoftStop, WorkerRequest,
1841 WorkerResponse, request::RequestType,
1842 },
1843 };
1844
1845 use super::testing::start_tcp_worker;
1846 use crate::testing::*;
1847
1848 #[test]
1862 fn round_trip() {
1863 setup_test_logger!();
1864 let barrier = Arc::new(Barrier::new(2));
1865 let test_finished = Arc::new(AtomicBool::new(false));
1866
1867 let front_port1 = provide_port();
1868 let front_port2 = provide_port();
1869
1870 let backend_port = start_server(barrier.clone(), test_finished.clone());
1871 let mut command =
1872 start_proxy(backend_port, front_port1, front_port2).expect("Could not start proxy");
1873 barrier.wait();
1874
1875 thread::scope(|_s| {
1876 let front_addr = format!("127.0.0.1:{front_port1}");
1877
1878 let mut s1 = TcpStream::connect(&front_addr).expect("could not connect");
1879 s1.set_read_timeout(Some(Duration::from_secs(5)))
1880 .expect("could not set read timeout on s1");
1881
1882 let s3 = TcpStream::connect(&front_addr).expect("could not connect");
1883
1884 let mut s2 = TcpStream::connect(&front_addr).expect("could not connect");
1885 s2.set_read_timeout(Some(Duration::from_secs(5)))
1886 .expect("could not set read timeout on s2");
1887
1888 s1.write_all(b"hello ").expect("could not write to s1");
1889 println!("s1 sent");
1890
1891 s2.write_all(b"pouet pouet").expect("could not write to s2");
1892 println!("s2 sent");
1893
1894 let mut res = [0; 128];
1895 s1.write_all(b"coucou").expect("could not write to s1");
1896
1897 s3.shutdown(Shutdown::Both).expect("could not shutdown s3");
1898
1899 let sz2 = s2
1900 .read(&mut res[..])
1901 .expect("could not read from socket s2");
1902 println!("s2 received {:?}", str::from_utf8(&res[..sz2]));
1903 assert_eq!(&res[..sz2], &b"pouet pouet"[..]);
1904
1905 let expected = b"hello coucou";
1910 let mut total = 0;
1911 while total < expected.len() {
1912 let sz = s1
1913 .read(&mut res[total..])
1914 .expect("could not read from socket s1");
1915 assert!(sz > 0, "connection closed before receiving all data");
1916 total += sz;
1917 }
1918 println!(
1919 "s1 received again({}): {:?}",
1920 total,
1921 str::from_utf8(&res[..total])
1922 );
1923 assert_eq!(&res[..total], &expected[..]);
1924
1925 test_finished.store(true, Ordering::Relaxed);
1927
1928 command
1930 .write_message(&WorkerRequest {
1931 id: "ID_SOFTSTOP".to_owned(),
1932 content: RequestType::SoftStop(SoftStop {}).into(),
1933 })
1934 .expect("could not send SoftStop to sozu worker");
1935 });
1936 }
1937
1938 fn start_server(barrier: Arc<Barrier>, test_finished: Arc<AtomicBool>) -> u16 {
1941 let listener =
1942 TcpListener::bind("127.0.0.1:0").expect("could not bind echo server listener");
1943 let port = listener
1944 .local_addr()
1945 .expect("could not get echo server local address")
1946 .port();
1947
1948 listener
1949 .set_nonblocking(true)
1950 .expect("could not set echo server listener to non-blocking");
1951
1952 thread::spawn(move || {
1953 barrier.wait();
1954 let mut count: u8 = 0;
1955 loop {
1956 match listener.accept() {
1957 Ok((mut stream, _)) => {
1958 let finished = test_finished.clone();
1959 thread::spawn(move || {
1960 println!("got a new client: {count}");
1961 stream
1962 .set_read_timeout(Some(Duration::from_secs(2)))
1963 .expect("could not set read timeout on echo client");
1964 let mut buf = [0; 128];
1965 loop {
1966 match stream.read(&mut buf[..]) {
1967 Ok(0) => break,
1968 Ok(sz) => {
1969 println!(
1970 "ECHO[{count}] got \"{:?}\"",
1971 str::from_utf8(&buf[..sz])
1972 );
1973 stream
1974 .write_all(&buf[..sz])
1975 .expect("could not echo data back");
1976 }
1977 Err(ref e)
1978 if e.kind() == std::io::ErrorKind::WouldBlock
1979 || e.kind() == std::io::ErrorKind::TimedOut =>
1980 {
1981 if finished.load(Ordering::Relaxed) {
1982 println!("backend server stopping (client handler)");
1983 break;
1984 }
1985 }
1986 Err(_) => break,
1987 }
1988 }
1989 });
1990 count = count.wrapping_add(1);
1991 }
1992 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
1993 if test_finished.load(Ordering::Relaxed) {
1994 println!("backend server stopping (accept loop)");
1995 break;
1996 }
1997 thread::sleep(Duration::from_millis(50));
1998 }
1999 Err(e) => {
2000 println!("connection failed: {e:?}");
2001 }
2002 }
2003 }
2004 });
2005
2006 port
2007 }
2008
2009 fn start_proxy(
2011 backend_port: u16,
2012 front_port1: u16,
2013 front_port2: u16,
2014 ) -> anyhow::Result<Channel<WorkerRequest, WorkerResponse>> {
2015 let config = ListenerBuilder::new_tcp(SocketAddress::new_v4(127, 0, 0, 1, front_port1))
2016 .to_tcp(None)
2017 .expect("could not create listener config");
2018
2019 let (mut command, channel) =
2020 Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
2021 let _jg = thread::spawn(move || {
2022 setup_test_logger!();
2023 start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server");
2024 });
2025
2026 command
2027 .blocking()
2028 .expect("could not set command channel to blocking");
2029 {
2030 let front = RequestTcpFrontend {
2031 cluster_id: "yolo".to_owned(),
2032 address: SocketAddress::new_v4(127, 0, 0, 1, front_port1),
2033 ..Default::default()
2034 };
2035 let backend = sozu_command_lib::response::Backend {
2036 cluster_id: "yolo".to_owned(),
2037 backend_id: "yolo-0".to_owned(),
2038 address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2039 load_balancing_parameters: Some(LoadBalancingParams::default()),
2040 sticky_id: None,
2041 backup: None,
2042 };
2043
2044 command
2045 .write_message(&WorkerRequest {
2046 id: "ID_YOLO1".to_owned(),
2047 content: RequestType::AddTcpFrontend(front).into(),
2048 })
2049 .expect("could not send AddTcpFrontend for front1");
2050 command
2051 .write_message(&WorkerRequest {
2052 id: "ID_YOLO2".to_owned(),
2053 content: RequestType::AddBackend(backend.to_add_backend()).into(),
2054 })
2055 .expect("could not send AddBackend for front1");
2056 }
2057 {
2058 let front = RequestTcpFrontend {
2059 cluster_id: "yolo".to_owned(),
2060 address: SocketAddress::new_v4(127, 0, 0, 1, front_port2),
2061 ..Default::default()
2062 };
2063 let backend = sozu_command::response::Backend {
2064 cluster_id: "yolo".to_owned(),
2065 backend_id: "yolo-0".to_owned(),
2066 address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2067 load_balancing_parameters: Some(LoadBalancingParams::default()),
2068 sticky_id: None,
2069 backup: None,
2070 };
2071 command
2072 .write_message(&WorkerRequest {
2073 id: "ID_YOLO3".to_owned(),
2074 content: RequestType::AddTcpFrontend(front).into(),
2075 })
2076 .expect("could not send AddTcpFrontend for front2");
2077 command
2078 .write_message(&WorkerRequest {
2079 id: "ID_YOLO4".to_owned(),
2080 content: RequestType::AddBackend(backend.to_add_backend()).into(),
2081 })
2082 .expect("could not send AddBackend for front2");
2083 }
2084
2085 for _ in 0..4 {
2086 println!(
2087 "read_message: {:?}",
2088 command
2089 .read_message()
2090 .with_context(|| "could not read message")?
2091 );
2092 }
2093
2094 Ok(command)
2095 }
2096}