1use std::{
2 cell::RefCell,
3 collections::{BTreeMap, HashMap, hash_map::Entry},
4 io::ErrorKind,
5 net::{Shutdown, SocketAddr},
6 os::unix::io::AsRawFd,
7 rc::Rc,
8 time::{Duration, Instant},
9};
10
11use mio::{
12 Interest, Registry, Token,
13 net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream},
14 unix::SourceFd,
15};
16use rusty_ulid::Ulid;
17use sozu_command::{
18 ObjectKind,
19 config::MAX_LOOP_ITERATIONS,
20 logging::{EndpointRecord, LogContext},
21 proto::command::request::RequestType,
22};
23
24use crate::{
25 AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
26 ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
27 Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
28 backends::{Backend, BackendMap},
29 pool::{Checkout, Pool},
30 protocol::{
31 Pipe,
32 pipe::WebSocketContext,
33 proxy_protocol::{
34 expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
35 },
36 },
37 retry::RetryPolicy,
38 server::{CONN_RETRIES, ListenToken, SessionManager, push_event},
39 socket::{server_bind, stats::socket_rtt},
40 sozu_command::{
41 proto::command::{
42 Event, EventKind, ProxyProtocolConfig, RequestTcpFrontend, TcpListenerConfig,
43 WorkerRequest, WorkerResponse,
44 },
45 ready::Ready,
46 state::ClusterId,
47 },
48 timer::TimeoutContainer,
49};
50
51StateMachineBuilder! {
52 enum TcpStateMachine {
57 Pipe(Pipe<MioTcpStream, TcpListener>),
58 SendProxyProtocol(SendProxyProtocol<MioTcpStream>),
59 RelayProxyProtocol(RelayProxyProtocol<MioTcpStream>),
60 ExpectProxyProtocol(ExpectProxyProtocol<MioTcpStream>),
61 }
62}
63
64macro_rules! log_context {
67 ($self:expr) => {
68 format!(
69 "TCP\t{}\tSession(frontend={}, backend={})\t >>>",
70 $self.log_context(),
71 $self.frontend_token.0,
72 $self
73 .backend_token
74 .map(|token| token.0.to_string())
75 .unwrap_or_else(|| "<none>".to_string()),
76 )
77 };
78}
79
80pub struct TcpSession {
81 backend_buffer: Option<Checkout>,
82 backend_connected: BackendConnectionStatus,
83 backend_id: Option<String>,
84 backend_token: Option<Token>,
85 backend: Option<Rc<RefCell<Backend>>>,
86 cluster_id: Option<String>,
87 configured_backend_timeout: Duration,
88 connection_attempt: u8,
89 container_backend_timeout: TimeoutContainer,
90 container_frontend_timeout: TimeoutContainer,
91 frontend_address: Option<SocketAddr>,
92 frontend_buffer: Option<Checkout>,
93 frontend_token: Token,
94 has_been_closed: SessionIsToBeClosed,
95 last_event: Instant,
96 listener: Rc<RefCell<TcpListener>>,
97 metrics: SessionMetrics,
98 proxy: Rc<RefCell<TcpProxy>>,
99 request_id: Ulid,
100 state: TcpStateMachine,
101}
102
103impl TcpSession {
104 #[allow(clippy::too_many_arguments)]
105 fn new(
106 backend_buffer: Checkout,
107 backend_id: Option<String>,
108 cluster_id: Option<String>,
109 configured_backend_timeout: Duration,
110 configured_connect_timeout: Duration,
111 configured_frontend_timeout: Duration,
112 frontend_buffer: Checkout,
113 frontend_token: Token,
114 listener: Rc<RefCell<TcpListener>>,
115 proxy_protocol: Option<ProxyProtocolConfig>,
116 proxy: Rc<RefCell<TcpProxy>>,
117 socket: MioTcpStream,
118 wait_time: Duration,
119 ) -> TcpSession {
120 let frontend_address = socket.peer_addr().ok();
121 let mut frontend_buffer_session = None;
122 let mut backend_buffer_session = None;
123
124 let request_id = Ulid::generate();
125
126 let container_frontend_timeout =
127 TimeoutContainer::new(configured_frontend_timeout, frontend_token);
128 let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout);
129
130 let state = match proxy_protocol {
131 Some(ProxyProtocolConfig::RelayHeader) => {
132 backend_buffer_session = Some(backend_buffer);
133 gauge_add!("protocol.proxy.relay", 1);
134 TcpStateMachine::RelayProxyProtocol(RelayProxyProtocol::new(
135 socket,
136 frontend_token,
137 request_id,
138 None,
139 frontend_buffer,
140 ))
141 }
142 Some(ProxyProtocolConfig::ExpectHeader) => {
143 frontend_buffer_session = Some(frontend_buffer);
144 backend_buffer_session = Some(backend_buffer);
145 gauge_add!("protocol.proxy.expect", 1);
146 TcpStateMachine::ExpectProxyProtocol(ExpectProxyProtocol::new(
147 container_frontend_timeout.clone(),
148 socket,
149 frontend_token,
150 request_id,
151 ))
152 }
153 Some(ProxyProtocolConfig::SendHeader) => {
154 frontend_buffer_session = Some(frontend_buffer);
155 backend_buffer_session = Some(backend_buffer);
156 gauge_add!("protocol.proxy.send", 1);
157 TcpStateMachine::SendProxyProtocol(SendProxyProtocol::new(
158 socket,
159 frontend_token,
160 request_id,
161 None,
162 ))
163 }
164 None => {
165 gauge_add!("protocol.tcp", 1);
166 let mut pipe = Pipe::new(
167 backend_buffer,
168 backend_id.clone(),
169 None,
170 None,
171 None,
172 None,
173 cluster_id.clone(),
174 frontend_buffer,
175 frontend_token,
176 socket,
177 listener.clone(),
178 Protocol::TCP,
179 request_id,
180 frontend_address,
181 WebSocketContext::Tcp,
182 );
183 pipe.set_cluster_id(cluster_id.clone());
184 TcpStateMachine::Pipe(pipe)
185 }
186 };
187
188 let metrics = SessionMetrics::new(Some(wait_time));
189 TcpSession {
192 backend_buffer: backend_buffer_session,
193 backend_connected: BackendConnectionStatus::NotConnected,
194 backend_id,
195 backend_token: None,
196 backend: None,
197 cluster_id,
198 configured_backend_timeout,
199 connection_attempt: 0,
200 container_backend_timeout,
201 container_frontend_timeout,
202 frontend_address,
203 frontend_buffer: frontend_buffer_session,
204 frontend_token,
205 has_been_closed: false,
206 last_event: Instant::now(),
207 listener,
208 metrics,
209 proxy,
210 request_id,
211 state,
212 }
213 }
214
215 fn log_request(&self) {
216 let listener = self.listener.borrow();
217 let context = self.log_context();
218 self.metrics.register_end_of_session(&context);
219 info_access!(
220 on_failure: { incr!("unsent-access-logs") },
221 message: None,
222 context,
223 session_address: self.frontend_address,
224 backend_address: None,
225 protocol: "TCP",
226 endpoint: EndpointRecord::Tcp,
227 tags: listener.get_tags(&listener.get_addr().to_string()),
228 client_rtt: socket_rtt(self.state.front_socket()),
229 server_rtt: None,
230 user_agent: None,
231 service_time: self.metrics.service_time(),
232 response_time: self.metrics.backend_response_time(),
233 request_time: self.metrics.request_time(),
234 bytes_in: self.metrics.bin,
235 bytes_out: self.metrics.bout,
236 otel: None,
237 );
238 }
239
240 fn front_hup(&mut self) -> SessionResult {
241 match &mut self.state {
242 TcpStateMachine::Pipe(pipe) => pipe.frontend_hup(&mut self.metrics),
243 _ => {
244 self.log_request();
245 SessionResult::Close
246 }
247 }
248 }
249
250 fn back_hup(&mut self) -> SessionResult {
251 match &mut self.state {
252 TcpStateMachine::Pipe(pipe) => pipe.backend_hup(&mut self.metrics),
253 _ => {
254 self.log_request();
255 SessionResult::Close
256 }
257 }
258 }
259
260 fn log_context(&self) -> LogContext<'_> {
261 LogContext {
262 request_id: self.request_id,
263 cluster_id: self.cluster_id.as_deref(),
264 backend_id: self.backend_id.as_deref(),
265 }
266 }
267
268 fn readable(&mut self) -> SessionResult {
269 if !self.container_frontend_timeout.reset() {
270 error!(
271 "{} Could not reset frontend timeout on readable",
272 log_context!(self)
273 );
274 }
275 if self.backend_connected == BackendConnectionStatus::Connected
276 && !self.container_backend_timeout.reset()
277 {
278 error!(
279 "{} Could not reset backend timeout on readable",
280 log_context!(self)
281 );
282 }
283 match &mut self.state {
284 TcpStateMachine::Pipe(pipe) => pipe.readable(&mut self.metrics),
285 TcpStateMachine::RelayProxyProtocol(pp) => pp.readable(&mut self.metrics),
286 TcpStateMachine::ExpectProxyProtocol(pp) => pp.readable(&mut self.metrics),
287 TcpStateMachine::SendProxyProtocol(_) => SessionResult::Continue,
288 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
289 }
290 }
291
292 fn writable(&mut self) -> SessionResult {
293 match &mut self.state {
294 TcpStateMachine::Pipe(pipe) => pipe.writable(&mut self.metrics),
295 _ => SessionResult::Continue,
296 }
297 }
298
299 fn back_readable(&mut self) -> SessionResult {
300 if !self.container_frontend_timeout.reset() {
301 error!(
302 "{} Could not reset frontend timeout on back_readable",
303 log_context!(self)
304 );
305 }
306 if !self.container_backend_timeout.reset() {
307 error!(
308 "{} Could not reset backend timeout on back_readable",
309 log_context!(self)
310 );
311 }
312
313 match &mut self.state {
314 TcpStateMachine::Pipe(pipe) => pipe.backend_readable(&mut self.metrics),
315 _ => SessionResult::Continue,
316 }
317 }
318
319 fn back_writable(&mut self) -> SessionResult {
320 match &mut self.state {
321 TcpStateMachine::Pipe(pipe) => pipe.backend_writable(&mut self.metrics),
322 TcpStateMachine::RelayProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
323 TcpStateMachine::SendProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
324 TcpStateMachine::ExpectProxyProtocol(_) => SessionResult::Continue,
325 TcpStateMachine::FailedUpgrade(_) => {
326 unreachable!()
327 }
328 }
329 }
330
331 fn back_socket_mut(&mut self) -> Option<&mut MioTcpStream> {
332 match &mut self.state {
333 TcpStateMachine::Pipe(pipe) => pipe.back_socket_mut(),
334 TcpStateMachine::SendProxyProtocol(pp) => pp.back_socket_mut(),
335 TcpStateMachine::RelayProxyProtocol(pp) => pp.back_socket_mut(),
336 TcpStateMachine::ExpectProxyProtocol(_) => None,
337 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
338 }
339 }
340
341 pub fn upgrade(&mut self) -> SessionIsToBeClosed {
342 let new_state = match self.state.take() {
343 TcpStateMachine::SendProxyProtocol(spp) => self.upgrade_send(spp),
344 TcpStateMachine::RelayProxyProtocol(rpp) => self.upgrade_relay(rpp),
345 TcpStateMachine::ExpectProxyProtocol(epp) => self.upgrade_expect(epp),
346 TcpStateMachine::Pipe(_) => None,
347 TcpStateMachine::FailedUpgrade(_) => todo!(),
348 };
349
350 match new_state {
351 Some(state) => {
352 self.state = state;
353 false
354 } None => true,
357 }
358 }
359
360 fn upgrade_send(
361 &mut self,
362 send_proxy_protocol: SendProxyProtocol<MioTcpStream>,
363 ) -> Option<TcpStateMachine> {
364 if self.backend_buffer.is_some() && self.frontend_buffer.is_some() {
365 let mut pipe = send_proxy_protocol.into_pipe(
366 self.frontend_buffer.take().unwrap(),
367 self.backend_buffer.take().unwrap(),
368 self.listener.clone(),
369 );
370
371 pipe.set_cluster_id(self.cluster_id.clone());
372 gauge_add!("protocol.proxy.send", -1);
373 gauge_add!("protocol.tcp", 1);
374 return Some(TcpStateMachine::Pipe(pipe));
375 }
376
377 error!(
378 "{} Missing the frontend or backend buffer queue, we can't switch to a pipe",
379 log_context!(self)
380 );
381 None
382 }
383
384 fn upgrade_relay(&mut self, rpp: RelayProxyProtocol<MioTcpStream>) -> Option<TcpStateMachine> {
385 if self.backend_buffer.is_some() {
386 let mut pipe =
387 rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone());
388 pipe.set_cluster_id(self.cluster_id.clone());
389 gauge_add!("protocol.proxy.relay", -1);
390 gauge_add!("protocol.tcp", 1);
391 return Some(TcpStateMachine::Pipe(pipe));
392 }
393
394 error!(
395 "{} Missing the backend buffer queue, we can't switch to a pipe",
396 log_context!(self)
397 );
398 None
399 }
400
401 fn upgrade_expect(
402 &mut self,
403 epp: ExpectProxyProtocol<MioTcpStream>,
404 ) -> Option<TcpStateMachine> {
405 if self.frontend_buffer.is_some() && self.backend_buffer.is_some() {
406 let mut pipe = epp.into_pipe(
407 self.frontend_buffer.take().unwrap(),
408 self.backend_buffer.take().unwrap(),
409 None,
410 None,
411 self.listener.clone(),
412 );
413
414 pipe.set_cluster_id(self.cluster_id.clone());
415 gauge_add!("protocol.proxy.expect", -1);
416 gauge_add!("protocol.tcp", 1);
417 return Some(TcpStateMachine::Pipe(pipe));
418 }
419
420 error!(
421 "{} Missing the backend buffer queue, we can't switch to a pipe",
422 log_context!(self)
423 );
424 None
425 }
426
427 fn front_readiness(&mut self) -> &mut Readiness {
428 match &mut self.state {
429 TcpStateMachine::Pipe(pipe) => &mut pipe.frontend_readiness,
430 TcpStateMachine::SendProxyProtocol(pp) => &mut pp.frontend_readiness,
431 TcpStateMachine::RelayProxyProtocol(pp) => &mut pp.frontend_readiness,
432 TcpStateMachine::ExpectProxyProtocol(pp) => &mut pp.frontend_readiness,
433 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
434 }
435 }
436
437 fn back_readiness(&mut self) -> Option<&mut Readiness> {
438 match &mut self.state {
439 TcpStateMachine::Pipe(pipe) => Some(&mut pipe.backend_readiness),
440 TcpStateMachine::SendProxyProtocol(pp) => Some(&mut pp.backend_readiness),
441 TcpStateMachine::RelayProxyProtocol(pp) => Some(&mut pp.backend_readiness),
442 TcpStateMachine::ExpectProxyProtocol(_) => None,
443 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
444 }
445 }
446
447 fn set_back_socket(&mut self, socket: MioTcpStream) {
448 match &mut self.state {
449 TcpStateMachine::Pipe(pipe) => pipe.set_back_socket(socket),
450 TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_socket(socket),
451 TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_socket(socket),
452 TcpStateMachine::ExpectProxyProtocol(_) => {
453 error!(
454 "{} We should not set the back socket for the expect proxy protocol",
455 log_context!(self)
456 );
457 panic!(
458 "{} We should not set the back socket for the expect proxy protocol",
459 log_context!(self)
460 );
461 }
462 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
463 }
464 }
465
466 fn set_back_token(&mut self, token: Token) {
467 self.backend_token = Some(token);
468
469 match &mut self.state {
470 TcpStateMachine::Pipe(pipe) => pipe.set_back_token(token),
471 TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_token(token),
472 TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_token(token),
473 TcpStateMachine::ExpectProxyProtocol(_) => self.backend_token = Some(token),
474 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
475 }
476 }
477
478 fn set_backend_id(&mut self, id: String) {
479 self.backend_id = Some(id.clone());
480 if let TcpStateMachine::Pipe(pipe) = &mut self.state {
481 pipe.set_backend_id(Some(id));
482 }
483 }
484
485 fn back_connected(&self) -> BackendConnectionStatus {
486 self.backend_connected
487 }
488
489 fn set_back_connected(&mut self, status: BackendConnectionStatus) {
490 let last = self.backend_connected;
491 self.backend_connected = status;
492
493 if status == BackendConnectionStatus::Connected {
494 gauge_add!("backend.connections", 1);
495 gauge_add!(
496 "connections_per_backend",
497 1,
498 self.cluster_id.as_deref(),
499 self.metrics.backend_id.as_deref()
500 );
501
502 self.container_backend_timeout
505 .set_duration(self.configured_backend_timeout);
506 self.container_frontend_timeout.reset();
507
508 if let TcpStateMachine::SendProxyProtocol(spp) = &mut self.state {
509 spp.set_back_connected(BackendConnectionStatus::Connected);
510 }
511
512 if let Some(backend) = self.backend.as_ref() {
513 let mut backend = backend.borrow_mut();
514
515 if backend.retry_policy.is_down() {
516 incr!(
517 "backend.up",
518 self.cluster_id.as_deref(),
519 self.metrics.backend_id.as_deref()
520 );
521 info!(
522 "backend server {} at {} is up",
523 backend.backend_id, backend.address
524 );
525 push_event(Event {
526 kind: EventKind::BackendUp as i32,
527 backend_id: Some(backend.backend_id.to_owned()),
528 address: Some(backend.address.into()),
529 cluster_id: None,
530 });
531 }
532
533 if let BackendConnectionStatus::Connecting(start) = last {
534 backend.set_connection_time(Instant::now() - start);
535 }
536
537 backend.failures = 0;
539 backend.retry_policy.succeed();
540 }
541 }
542 }
543
544 fn remove_backend(&mut self) {
545 if let Some(backend) = self.backend.take() {
546 (*backend.borrow_mut()).dec_connections();
547 }
548
549 self.backend_token = None;
550 }
551
552 fn fail_backend_connection(&mut self) {
553 if let Some(backend) = self.backend.as_ref() {
554 let backend = &mut *backend.borrow_mut();
555 backend.failures += 1;
556
557 let already_unavailable = backend.retry_policy.is_down();
558 backend.retry_policy.fail();
559 incr!(
560 "backend.connections.error",
561 self.cluster_id.as_deref(),
562 self.metrics.backend_id.as_deref()
563 );
564 if !already_unavailable && backend.retry_policy.is_down() {
565 error!(
566 "backend server {} at {} is down",
567 backend.backend_id, backend.address
568 );
569 incr!(
570 "backend.down",
571 self.cluster_id.as_deref(),
572 self.metrics.backend_id.as_deref()
573 );
574
575 push_event(Event {
576 kind: EventKind::BackendDown as i32,
577 backend_id: Some(backend.backend_id.to_owned()),
578 address: Some(backend.address.into()),
579 cluster_id: None,
580 });
581 }
582 }
583 }
584
585 pub fn test_back_socket(&mut self) -> SessionIsToBeClosed {
586 match self.back_socket_mut() {
587 Some(ref mut s) => {
588 let mut tmp = [0u8; 1];
589 let res = s.peek(&mut tmp[..]);
590
591 match res {
592 Ok(0) => false,
594 Ok(_) => true,
595 Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
596 }
597 }
598 None => false,
599 }
600 }
601
602 pub fn cancel_timeouts(&mut self) {
603 self.container_frontend_timeout.cancel();
604 self.container_backend_timeout.cancel();
605 }
606
607 fn ready_inner(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionResult {
608 let mut counter = 0;
609
610 let back_connected = self.back_connected();
611 if back_connected.is_connecting() {
612 if self.back_readiness().unwrap().event.is_hup() && !self.test_back_socket() {
613 debug!("error connecting to backend, trying again");
615 self.connection_attempt += 1;
616 self.fail_backend_connection();
617
618 self.close_backend();
620 let connection_result = self.connect_to_backend(session.clone());
621 if let Err(err) = &connection_result {
622 error!(
623 "{} Error connecting to backend: {}",
624 log_context!(self),
625 err
626 );
627 }
628
629 if let Some(state_result) = handle_connection_result(connection_result) {
630 return state_result;
631 }
632 } else if self.back_readiness().unwrap().event != Ready::EMPTY {
633 self.connection_attempt = 0;
634 self.set_back_connected(BackendConnectionStatus::Connected);
635 }
636 } else if back_connected == BackendConnectionStatus::NotConnected {
637 let connection_result = self.connect_to_backend(session.clone());
638 if let Err(err) = &connection_result {
639 error!(
640 "{} Error connecting to backend: {}",
641 log_context!(self),
642 err
643 );
644 }
645
646 if let Some(state_result) = handle_connection_result(connection_result) {
647 return state_result;
648 }
649 }
650
651 if self.front_readiness().event.is_hup() {
652 let session_result = self.front_hup();
653 if session_result == SessionResult::Continue {
654 self.front_readiness().event.remove(Ready::HUP);
655 }
656 return session_result;
657 }
658
659 while counter < MAX_LOOP_ITERATIONS {
660 let front_interest = self.front_readiness().interest & self.front_readiness().event;
661 let back_interest = self
662 .back_readiness()
663 .map(|r| r.interest & r.event)
664 .unwrap_or(Ready::EMPTY);
665
666 trace!(
667 "{} Frontend interest({:?}) and backend interest({:?})",
668 log_context!(self),
669 front_interest,
670 back_interest
671 );
672
673 if front_interest == Ready::EMPTY && back_interest == Ready::EMPTY {
674 break;
675 }
676
677 if self
678 .back_readiness()
679 .map(|r| r.event.is_hup())
680 .unwrap_or(false)
681 && self.front_readiness().interest.is_writable()
682 && !self.front_readiness().event.is_writable()
683 {
684 break;
685 }
686
687 if front_interest.is_readable() {
688 let session_result = self.readable();
689 if session_result != SessionResult::Continue {
690 return session_result;
691 }
692 }
693
694 if back_interest.is_writable() {
695 let session_result = self.back_writable();
696 if session_result != SessionResult::Continue {
697 return session_result;
698 }
699 }
700
701 if back_interest.is_readable() {
702 let session_result = self.back_readable();
703 if session_result != SessionResult::Continue {
704 return session_result;
705 }
706 }
707
708 if front_interest.is_writable() {
709 let session_result = self.writable();
710 if session_result != SessionResult::Continue {
711 return session_result;
712 }
713 }
714
715 if back_interest.is_hup() {
716 let session_result = self.back_hup();
717 if session_result != SessionResult::Continue {
718 return session_result;
719 }
720 }
721
722 if front_interest.is_error() {
723 error!(
724 "{} Frontend socket error, disconnecting",
725 log_context!(self)
726 );
727 self.front_readiness().interest = Ready::EMPTY;
728 if let Some(r) = self.back_readiness() {
729 r.interest = Ready::EMPTY;
730 }
731
732 return SessionResult::Close;
733 }
734
735 if back_interest.is_error() && self.back_hup() == SessionResult::Close {
736 self.front_readiness().interest = Ready::EMPTY;
737 if let Some(r) = self.back_readiness() {
738 r.interest = Ready::EMPTY;
739 }
740
741 error!("{} backend socket error, disconnecting", log_context!(self));
742 return SessionResult::Close;
743 }
744
745 counter += 1;
746 }
747
748 if counter >= MAX_LOOP_ITERATIONS {
749 error!(
750 "{} Handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
751 log_context!(self),
752 MAX_LOOP_ITERATIONS
753 );
754
755 incr!("tcp.infinite_loop.error");
756
757 let front_interest = self.front_readiness().interest & self.front_readiness().event;
758 let back_interest = self
759 .back_readiness()
760 .map(|r| r.interest & r.event)
761 .unwrap_or(Ready::EMPTY);
762
763 let back = self.back_readiness().cloned();
764
765 error!(
766 "{} readiness: front {:?} / back {:?} | front: {:?} | back: {:?} ",
767 log_context!(self),
768 self.front_readiness(),
769 back,
770 front_interest,
771 back_interest
772 );
773
774 self.print_session();
775
776 return SessionResult::Close;
777 }
778
779 SessionResult::Continue
780 }
781
782 fn close_backend(&mut self) {
784 if let (Some(token), Some(fd)) = (
785 self.backend_token,
786 self.back_socket_mut().map(|s| s.as_raw_fd()),
787 ) {
788 let proxy = self.proxy.borrow();
789 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
790 error!(
791 "{} Error deregistering socket({:?}): {:?}",
792 log_context!(self),
793 fd,
794 e
795 );
796 }
797
798 proxy.sessions.borrow_mut().slab.try_remove(token.0);
799 }
800 self.remove_backend();
801
802 let back_connected = self.back_connected();
803 if back_connected != BackendConnectionStatus::NotConnected {
804 if let Some(r) = self.back_readiness() {
805 r.event = Ready::EMPTY;
806 }
807
808 let log_context = log_context!(self);
809 if let Some(sock) = self.back_socket_mut() {
810 if let Err(e) = sock.shutdown(Shutdown::Both) {
811 if e.kind() != ErrorKind::NotConnected {
812 error!(
813 "{} Error closing back socket({:?}): {:?}",
814 log_context, sock, e
815 );
816 }
817 }
818 }
819 }
820
821 if back_connected == BackendConnectionStatus::Connected {
822 gauge_add!("backend.connections", -1);
823 gauge_add!(
824 "connections_per_backend",
825 -1,
826 self.cluster_id.as_deref(),
827 self.metrics.backend_id.as_deref()
828 );
829 }
830
831 self.set_back_connected(BackendConnectionStatus::NotConnected);
832 }
833
834 fn connect_to_backend(
835 &mut self,
836 session_rc: Rc<RefCell<dyn ProxySession>>,
837 ) -> Result<BackendConnectAction, BackendConnectionError> {
838 let cluster_id = self
839 .listener
840 .borrow()
841 .cluster_id
842 .clone()
843 .ok_or(BackendConnectionError::NotFound(ObjectKind::TcpCluster))?;
844
845 self.cluster_id = Some(cluster_id.clone());
846
847 if self.connection_attempt >= CONN_RETRIES {
848 error!(
849 "{} Max connection attempt reached ({})",
850 log_context!(self),
851 self.connection_attempt
852 );
853 return Err(BackendConnectionError::MaxConnectionRetries(Some(
854 cluster_id,
855 )));
856 }
857
858 if self.proxy.borrow().sessions.borrow().at_capacity() {
859 return Err(BackendConnectionError::MaxSessionsMemory);
860 }
861
862 let (backend, mut stream) = self
863 .proxy
864 .borrow()
865 .backends
866 .borrow_mut()
867 .backend_from_cluster_id(&cluster_id)
868 .map_err(BackendConnectionError::Backend)?;
869
870 if let Err(e) = stream.set_nodelay(true) {
871 error!(
872 "{} Error setting nodelay on back socket({:?}): {:?}",
873 log_context!(self),
874 stream,
875 e
876 );
877 }
878 self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());
879
880 let back_token = {
881 let proxy = self.proxy.borrow();
882 let mut s = proxy.sessions.borrow_mut();
883 let entry = s.slab.vacant_entry();
884 let back_token = Token(entry.key());
885 let _entry = entry.insert(session_rc.clone());
886 back_token
887 };
888
889 if let Err(e) = self.proxy.borrow().registry.register(
890 &mut stream,
891 back_token,
892 Interest::READABLE | Interest::WRITABLE,
893 ) {
894 error!(
895 "{} Error registering back socket({:?}): {:?}",
896 log_context!(self),
897 stream,
898 e
899 );
900 }
901
902 self.container_backend_timeout.set(back_token);
903
904 self.set_back_token(back_token);
905 self.set_back_socket(stream);
906
907 self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
908 self.metrics.backend_start();
909 self.set_backend_id(backend.borrow().backend_id.clone());
910
911 Ok(BackendConnectAction::New)
912 }
913}
914
915impl ProxySession for TcpSession {
916 fn close(&mut self) {
917 if self.has_been_closed {
918 return;
919 }
920
921 trace!("{} Closing TCP session", log_context!(self));
923 self.metrics.service_stop();
924
925 match self.state.marker() {
927 StateMarker::Pipe => gauge_add!("protocol.tcp", -1),
928 StateMarker::SendProxyProtocol => gauge_add!("protocol.proxy.send", -1),
929 StateMarker::RelayProxyProtocol => gauge_add!("protocol.proxy.relay", -1),
930 StateMarker::ExpectProxyProtocol => gauge_add!("protocol.proxy.expect", -1),
931 }
932
933 if self.state.failed() {
934 match self.state.marker() {
935 StateMarker::Pipe => incr!("tcp.upgrade.pipe.failed"),
936 StateMarker::SendProxyProtocol => incr!("tcp.upgrade.send.failed"),
937 StateMarker::RelayProxyProtocol => incr!("tcp.upgrade.relay.failed"),
938 StateMarker::ExpectProxyProtocol => incr!("tcp.upgrade.expect.failed"),
939 }
940 return;
941 }
942
943 self.cancel_timeouts();
944
945 let front_socket = self.state.front_socket();
946 if let Err(e) = front_socket.shutdown(Shutdown::Both) {
947 if e.kind() != ErrorKind::NotConnected {
949 error!(
950 "{} Error shutting down front socket({:?}): {:?}",
951 log_context!(self),
952 front_socket,
953 e
954 );
955 }
956 }
957
958 {
960 let proxy = self.proxy.borrow();
961 let fd = front_socket.as_raw_fd();
962 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
963 error!(
964 "{} Error deregistering front socket({:?}) while closing TCP session: {:?}",
965 log_context!(self),
966 fd,
967 e
968 );
969 }
970 proxy
971 .sessions
972 .borrow_mut()
973 .slab
974 .try_remove(self.frontend_token.0);
975 }
976
977 self.close_backend();
978 self.has_been_closed = true;
979 }
980
981 fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
982 if self.frontend_token == token {
983 self.container_frontend_timeout.triggered();
984 return true;
985 }
986 if self.backend_token == Some(token) {
987 self.container_backend_timeout.triggered();
988 return true;
989 }
990 false
992 }
993
994 fn protocol(&self) -> Protocol {
995 Protocol::TCP
996 }
997
998 fn update_readiness(&mut self, token: Token, events: Ready) {
999 trace!(
1000 "{} token {:?} got event {}",
1001 log_context!(self),
1002 token,
1003 super::ready_to_string(events)
1004 );
1005
1006 self.last_event = Instant::now();
1007 self.metrics.wait_start();
1008
1009 if self.frontend_token == token {
1010 self.front_readiness().event = self.front_readiness().event | events;
1011 } else if self.backend_token == Some(token) {
1012 if let Some(r) = self.back_readiness() {
1013 r.event |= events;
1014 }
1015 }
1016 }
1017
1018 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1019 self.metrics.service_start();
1020
1021 let session_result = self.ready_inner(session.clone());
1022
1023 let to_bo_closed = match session_result {
1024 SessionResult::Close => true,
1025 SessionResult::Continue => false,
1026 SessionResult::Upgrade => match self.upgrade() {
1027 false => self.ready(session),
1028 true => true,
1029 },
1030 };
1031
1032 self.metrics.service_stop();
1033 to_bo_closed
1034 }
1035
1036 fn shutting_down(&mut self) -> SessionIsToBeClosed {
1037 true
1038 }
1039
1040 fn last_event(&self) -> Instant {
1041 self.last_event
1042 }
1043
1044 fn print_session(&self) {
1045 let state: String = match &self.state {
1046 TcpStateMachine::ExpectProxyProtocol(_) => String::from("Expect"),
1047 TcpStateMachine::SendProxyProtocol(_) => String::from("Send"),
1048 TcpStateMachine::RelayProxyProtocol(_) => String::from("Relay"),
1049 TcpStateMachine::Pipe(_) => String::from("TCP"),
1050 TcpStateMachine::FailedUpgrade(marker) => format!("FailedUpgrade({marker:?})"),
1051 };
1052
1053 let front_readiness = match &self.state {
1054 TcpStateMachine::ExpectProxyProtocol(expect) => Some(&expect.frontend_readiness),
1055 TcpStateMachine::SendProxyProtocol(send) => Some(&send.frontend_readiness),
1056 TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.frontend_readiness),
1057 TcpStateMachine::Pipe(pipe) => Some(&pipe.frontend_readiness),
1058 TcpStateMachine::FailedUpgrade(_) => None,
1059 };
1060
1061 let back_readiness = match &self.state {
1062 TcpStateMachine::SendProxyProtocol(send) => Some(&send.backend_readiness),
1063 TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.backend_readiness),
1064 TcpStateMachine::Pipe(pipe) => Some(&pipe.backend_readiness),
1065 TcpStateMachine::ExpectProxyProtocol(_) => None,
1066 TcpStateMachine::FailedUpgrade(_) => None,
1067 };
1068
1069 error!(
1070 "\
1071{} Session ({:?})
1072\tFrontend:
1073\t\ttoken: {:?}\treadiness: {:?}
1074\tBackend:
1075\t\ttoken: {:?}\treadiness: {:?}\tstatus: {:?}\tcluster id: {:?}",
1076 log_context!(self),
1077 state,
1078 self.frontend_token,
1079 front_readiness,
1080 self.backend_token,
1081 back_readiness,
1082 self.backend_connected,
1083 self.cluster_id
1084 );
1085 error!("Metrics: {:?}", self.metrics);
1086 }
1087
1088 fn frontend_token(&self) -> Token {
1089 self.frontend_token
1090 }
1091}
1092
1093pub struct TcpListener {
1094 active: SessionIsToBeClosed,
1095 address: SocketAddr,
1096 cluster_id: Option<String>,
1097 config: TcpListenerConfig,
1098 listener: Option<MioTcpListener>,
1099 tags: BTreeMap<String, CachedTags>,
1100 token: Token,
1101}
1102
1103impl ListenerHandler for TcpListener {
1104 fn get_addr(&self) -> &SocketAddr {
1105 &self.address
1106 }
1107
1108 fn get_tags(&self, key: &str) -> Option<&CachedTags> {
1109 self.tags.get(key)
1110 }
1111
1112 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
1113 match tags {
1114 Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
1115 None => self.tags.remove(&key),
1116 };
1117 }
1118}
1119
1120impl TcpListener {
1121 fn new(config: TcpListenerConfig, token: Token) -> Result<TcpListener, ListenerError> {
1122 Ok(TcpListener {
1123 cluster_id: None,
1124 listener: None,
1125 token,
1126 address: config.address.into(),
1127 config,
1128 active: false,
1129 tags: BTreeMap::new(),
1130 })
1131 }
1132
1133 pub fn activate(
1134 &mut self,
1135 registry: &Registry,
1136 tcp_listener: Option<MioTcpListener>,
1137 ) -> Result<Token, ProxyError> {
1138 if self.active {
1139 return Ok(self.token);
1140 }
1141
1142 let mut listener = match tcp_listener {
1143 Some(listener) => listener,
1144 None => {
1145 let address = self.config.address.into();
1146 server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
1147 }
1148 };
1149
1150 registry
1151 .register(&mut listener, self.token, Interest::READABLE)
1152 .map_err(ProxyError::RegisterListener)?;
1153
1154 self.listener = Some(listener);
1155 self.active = true;
1156 Ok(self.token)
1157 }
1158}
1159
1160fn handle_connection_result(
1161 connection_result: Result<BackendConnectAction, BackendConnectionError>,
1162) -> Option<SessionResult> {
1163 match connection_result {
1164 Ok(BackendConnectAction::Reuse) => None,
1166 Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
1167 Some(SessionResult::Continue)
1169 }
1170 Err(_) => {
1171 Some(SessionResult::Close)
1174 }
1175 }
1176}
1177
1178#[derive(Debug)]
1179pub struct ClusterConfiguration {
1180 proxy_protocol: Option<ProxyProtocolConfig>,
1181 }
1184
1185pub struct TcpProxy {
1186 fronts: HashMap<String, Token>,
1187 backends: Rc<RefCell<BackendMap>>,
1188 listeners: HashMap<Token, Rc<RefCell<TcpListener>>>,
1189 configs: HashMap<ClusterId, ClusterConfiguration>,
1190 registry: Registry,
1191 sessions: Rc<RefCell<SessionManager>>,
1192 pool: Rc<RefCell<Pool>>,
1193}
1194
1195impl TcpProxy {
1196 pub fn new(
1197 registry: Registry,
1198 sessions: Rc<RefCell<SessionManager>>,
1199 pool: Rc<RefCell<Pool>>,
1200 backends: Rc<RefCell<BackendMap>>,
1201 ) -> TcpProxy {
1202 TcpProxy {
1203 backends,
1204 listeners: HashMap::new(),
1205 configs: HashMap::new(),
1206 fronts: HashMap::new(),
1207 registry,
1208 sessions,
1209 pool,
1210 }
1211 }
1212
1213 pub fn add_listener(
1214 &mut self,
1215 config: TcpListenerConfig,
1216 token: Token,
1217 ) -> Result<Token, ProxyError> {
1218 match self.listeners.entry(token) {
1219 Entry::Vacant(entry) => {
1220 let tcp_listener =
1221 TcpListener::new(config, token).map_err(ProxyError::AddListener)?;
1222 entry.insert(Rc::new(RefCell::new(tcp_listener)));
1223 Ok(token)
1224 }
1225 _ => Err(ProxyError::ListenerAlreadyPresent),
1226 }
1227 }
1228
1229 pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
1230 let len = self.listeners.len();
1231
1232 self.listeners.retain(|_, l| l.borrow().address != address);
1233 self.listeners.len() < len
1234 }
1235
1236 pub fn activate_listener(
1237 &self,
1238 addr: &SocketAddr,
1239 tcp_listener: Option<MioTcpListener>,
1240 ) -> Result<Token, ProxyError> {
1241 let listener = self
1242 .listeners
1243 .values()
1244 .find(|listener| listener.borrow().address == *addr)
1245 .ok_or(ProxyError::NoListenerFound(*addr))?;
1246
1247 listener.borrow_mut().activate(&self.registry, tcp_listener)
1248 }
1249
1250 pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1251 self.listeners
1252 .values()
1253 .filter_map(|listener| {
1254 let mut owned = listener.borrow_mut();
1255 if let Some(listener) = owned.listener.take() {
1256 return Some((owned.address, listener));
1257 }
1258
1259 None
1260 })
1261 .collect()
1262 }
1263
1264 pub fn give_back_listener(
1265 &mut self,
1266 address: SocketAddr,
1267 ) -> Result<(Token, MioTcpListener), ProxyError> {
1268 let listener = self
1269 .listeners
1270 .values()
1271 .find(|listener| listener.borrow().address == address)
1272 .ok_or(ProxyError::NoListenerFound(address))?;
1273
1274 let mut owned = listener.borrow_mut();
1275
1276 let taken_listener = owned
1277 .listener
1278 .take()
1279 .ok_or(ProxyError::UnactivatedListener)?;
1280
1281 Ok((owned.token, taken_listener))
1282 }
1283
1284 pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1285 let address = front.address.into();
1286
1287 let mut listener = self
1288 .listeners
1289 .values()
1290 .find(|l| l.borrow().address == address)
1291 .ok_or(ProxyError::NoListenerFound(address))?
1292 .borrow_mut();
1293
1294 self.fronts
1295 .insert(front.cluster_id.to_string(), listener.token);
1296 listener.set_tags(address.to_string(), Some(front.tags));
1297 listener.cluster_id = Some(front.cluster_id);
1298 Ok(())
1299 }
1300
1301 pub fn remove_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1302 let address = front.address.into();
1303
1304 let mut listener = match self
1305 .listeners
1306 .values()
1307 .find(|l| l.borrow().address == address)
1308 {
1309 Some(l) => l.borrow_mut(),
1310 None => return Err(ProxyError::NoListenerFound(address)),
1311 };
1312
1313 listener.set_tags(address.to_string(), None);
1314 if let Some(cluster_id) = listener.cluster_id.take() {
1315 self.fronts.remove(&cluster_id);
1316 }
1317 Ok(())
1318 }
1319}
1320
1321impl ProxyConfiguration for TcpProxy {
1322 fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
1323 let request_type = match message.content.request_type {
1324 Some(t) => t,
1325 None => return WorkerResponse::error(message.id, "Empty request"),
1326 };
1327 match request_type {
1328 RequestType::AddTcpFrontend(front) => {
1329 if let Err(err) = self.add_tcp_front(front) {
1330 return WorkerResponse::error(message.id, err);
1331 }
1332
1333 WorkerResponse::ok(message.id)
1334 }
1335 RequestType::RemoveTcpFrontend(front) => {
1336 if let Err(err) = self.remove_tcp_front(front) {
1337 return WorkerResponse::error(message.id, err);
1338 }
1339
1340 WorkerResponse::ok(message.id)
1341 }
1342 RequestType::SoftStop(_) => {
1343 info!("{} processing soft shutdown", message.id);
1344 let listeners: HashMap<_, _> = self.listeners.drain().collect();
1345 for (_, l) in listeners.iter() {
1346 l.borrow_mut()
1347 .listener
1348 .take()
1349 .map(|mut sock| self.registry.deregister(&mut sock));
1350 }
1351 WorkerResponse::processing(message.id)
1352 }
1353 RequestType::HardStop(_) => {
1354 info!("{} hard shutdown", message.id);
1355 let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1356 for (_, l) in listeners.drain() {
1357 l.borrow_mut()
1358 .listener
1359 .take()
1360 .map(|mut sock| self.registry.deregister(&mut sock));
1361 }
1362 WorkerResponse::ok(message.id)
1363 }
1364 RequestType::Status(_) => {
1365 info!("{} status", message.id);
1366 WorkerResponse::ok(message.id)
1367 }
1368 RequestType::AddCluster(cluster) => {
1369 let config = ClusterConfiguration {
1370 proxy_protocol: cluster
1371 .proxy_protocol
1372 .and_then(|n| ProxyProtocolConfig::try_from(n).ok()),
1373 };
1375 self.configs.insert(cluster.cluster_id, config);
1376 WorkerResponse::ok(message.id)
1377 }
1378 RequestType::RemoveCluster(cluster_id) => {
1379 self.configs.remove(&cluster_id);
1380 WorkerResponse::ok(message.id)
1381 }
1382 RequestType::RemoveListener(remove) => {
1383 if !self.remove_listener(remove.address.into()) {
1384 WorkerResponse::error(
1385 message.id,
1386 format!("no TCP listener to remove at address {:?}", remove.address),
1387 )
1388 } else {
1389 WorkerResponse::ok(message.id)
1390 }
1391 }
1392 command => {
1393 debug!(
1394 "{} unsupported message for TCP proxy, ignoring {:?}",
1395 message.id, command
1396 );
1397 WorkerResponse::error(message.id, "unsupported message")
1398 }
1399 }
1400 }
1401
1402 fn accept(&mut self, token: ListenToken) -> Result<MioTcpStream, AcceptError> {
1403 let internal_token = Token(token.0);
1404 if let Some(listener) = self.listeners.get(&internal_token) {
1405 if let Some(tcp_listener) = &listener.borrow().listener {
1406 tcp_listener
1407 .accept()
1408 .map(|(frontend_sock, _)| frontend_sock)
1409 .map_err(|e| match e.kind() {
1410 ErrorKind::WouldBlock => AcceptError::WouldBlock,
1411 _ => {
1412 error!("accept() IO error: {:?}", e);
1413 AcceptError::IoError
1414 }
1415 })
1416 } else {
1417 Err(AcceptError::IoError)
1418 }
1419 } else {
1420 Err(AcceptError::IoError)
1421 }
1422 }
1423
1424 fn create_session(
1425 &mut self,
1426 mut frontend_sock: MioTcpStream,
1427 token: ListenToken,
1428 wait_time: Duration,
1429 proxy: Rc<RefCell<Self>>,
1430 ) -> Result<(), AcceptError> {
1431 let listener_token = Token(token.0);
1432
1433 let listener = self
1434 .listeners
1435 .get(&listener_token)
1436 .ok_or(AcceptError::IoError)?;
1437
1438 let owned = listener.borrow();
1439 let mut pool = self.pool.borrow_mut();
1440
1441 let (front_buffer, back_buffer) = match (pool.checkout(), pool.checkout()) {
1442 (Some(fb), Some(bb)) => (fb, bb),
1443 _ => {
1444 error!("could not get buffers from pool");
1445 error!(
1446 "Buffer capacity has been reached, stopping to accept new connections for now"
1447 );
1448 gauge!("accept_queue.backpressure", 1);
1449 self.sessions.borrow_mut().can_accept = false;
1450
1451 return Err(AcceptError::BufferCapacityReached);
1452 }
1453 };
1454
1455 if owned.cluster_id.is_none() {
1456 error!(
1457 "listener at address {:?} has no linked cluster",
1458 owned.address
1459 );
1460 return Err(AcceptError::IoError);
1461 }
1462
1463 let proxy_protocol = self
1464 .configs
1465 .get(owned.cluster_id.as_ref().unwrap())
1466 .and_then(|c| c.proxy_protocol);
1467
1468 if let Err(e) = frontend_sock.set_nodelay(true) {
1469 error!(
1470 "error setting nodelay on front socket({:?}): {:?}",
1471 frontend_sock, e
1472 );
1473 }
1474
1475 let mut session_manager = self.sessions.borrow_mut();
1476 let entry = session_manager.slab.vacant_entry();
1477 let frontend_token = Token(entry.key());
1478
1479 if let Err(register_error) = self.registry.register(
1480 &mut frontend_sock,
1481 frontend_token,
1482 Interest::READABLE | Interest::WRITABLE,
1483 ) {
1484 error!(
1485 "error registering front socket({:?}): {:?}",
1486 frontend_sock, register_error
1487 );
1488 return Err(AcceptError::RegisterError);
1489 }
1490
1491 let session = TcpSession::new(
1492 back_buffer,
1493 None,
1494 owned.cluster_id.clone(),
1495 Duration::from_secs(owned.config.back_timeout as u64),
1496 Duration::from_secs(owned.config.connect_timeout as u64),
1497 Duration::from_secs(owned.config.front_timeout as u64),
1498 front_buffer,
1499 frontend_token,
1500 listener.clone(),
1501 proxy_protocol,
1502 proxy,
1503 frontend_sock,
1504 wait_time,
1505 );
1506 incr!("tcp.requests");
1507
1508 let session = Rc::new(RefCell::new(session));
1509 entry.insert(session);
1510
1511 Ok(())
1512 }
1513}
1514
1515pub mod testing {
1516 use crate::testing::*;
1517
1518 pub fn start_tcp_worker(
1520 config: TcpListenerConfig,
1521 max_buffers: usize,
1522 buffer_size: usize,
1523 channel: ProxyChannel,
1524 ) -> anyhow::Result<()> {
1525 let address = config.address.into();
1526
1527 let ServerParts {
1528 event_loop,
1529 registry,
1530 sessions,
1531 pool,
1532 backends,
1533 client_scm_socket: _,
1534 server_scm_socket,
1535 server_config,
1536 } = prebuild_server(max_buffers, buffer_size, true)?;
1537
1538 let token = {
1539 let mut sessions = sessions.borrow_mut();
1540 let entry = sessions.slab.vacant_entry();
1541 let key = entry.key();
1542 let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1543 protocol: Protocol::TCPListen,
1544 })));
1545 Token(key)
1546 };
1547
1548 let mut proxy = TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1549 proxy
1550 .add_listener(config, token)
1551 .with_context(|| "Failed at creating adding the listener")?;
1552 proxy
1553 .activate_listener(&address, None)
1554 .with_context(|| "Failed at creating activating the listener")?;
1555
1556 let mut server = Server::new(
1557 event_loop,
1558 channel,
1559 server_scm_socket,
1560 sessions,
1561 pool,
1562 backends,
1563 None,
1564 None,
1565 Some(proxy),
1566 server_config,
1567 None,
1568 false,
1569 )
1570 .with_context(|| "Failed at creating server")?;
1571
1572 debug!("starting event loop");
1573 server.run();
1574 debug!("ending event loop");
1575 Ok(())
1576 }
1577}
1578
1579#[cfg(test)]
1580mod tests {
1581 use std::{
1582 io::{Read, Write},
1583 net::{Shutdown, TcpListener, TcpStream},
1584 str,
1585 sync::{
1586 Arc, Barrier,
1587 atomic::{AtomicBool, Ordering},
1588 },
1589 thread,
1590 };
1591
1592 use sozu_command::{
1593 channel::Channel,
1594 config::ListenerBuilder,
1595 proto::command::{
1596 LoadBalancingParams, RequestTcpFrontend, SocketAddress, WorkerRequest, WorkerResponse,
1597 request::RequestType,
1598 },
1599 };
1600
1601 use super::testing::start_tcp_worker;
1602 use crate::testing::*;
1603 static TEST_FINISHED: AtomicBool = AtomicBool::new(false);
1604
1605 #[test]
1619 fn round_trip() {
1620 setup_test_logger!();
1621 let barrier = Arc::new(Barrier::new(2));
1622 start_server(barrier.clone());
1623 let _tx = start_proxy().expect("Could not start proxy");
1624 barrier.wait();
1625
1626 let mut s1 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1627 let s3 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1628 let mut s2 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1629
1630 s1.write(&b"hello "[..])
1631 .map_err(|e| {
1632 TEST_FINISHED.store(true, Ordering::Relaxed);
1633 e
1634 })
1635 .unwrap();
1636 println!("s1 sent");
1637
1638 s2.write(&b"pouet pouet"[..])
1639 .map_err(|e| {
1640 TEST_FINISHED.store(true, Ordering::Relaxed);
1641 e
1642 })
1643 .unwrap();
1644
1645 println!("s2 sent");
1646
1647 let mut res = [0; 128];
1648 s1.write(&b"coucou"[..])
1649 .map_err(|e| {
1650 TEST_FINISHED.store(true, Ordering::Relaxed);
1651 e
1652 })
1653 .unwrap();
1654
1655 s3.shutdown(Shutdown::Both).unwrap();
1656 let sz2 = s2
1657 .read(&mut res[..])
1658 .map_err(|e| {
1659 TEST_FINISHED.store(true, Ordering::Relaxed);
1660 e
1661 })
1662 .expect("could not read from socket");
1663 println!("s2 received {:?}", str::from_utf8(&res[..sz2]));
1664 assert_eq!(&res[..sz2], &b"pouet pouet"[..]);
1665
1666 let sz1 = s1
1667 .read(&mut res[..])
1668 .map_err(|e| {
1669 TEST_FINISHED.store(true, Ordering::Relaxed);
1670 e
1671 })
1672 .expect("could not read from socket");
1673 println!(
1674 "s1 received again({}): {:?}",
1675 sz1,
1676 str::from_utf8(&res[..sz1])
1677 );
1678 assert_eq!(&res[..sz1], &b"hello coucou"[..]);
1679 TEST_FINISHED.store(true, Ordering::Relaxed);
1680 }
1681
1682 fn start_server(barrier: Arc<Barrier>) {
1683 let listener = TcpListener::bind("127.0.0.1:5678").expect("could not bind");
1684 fn handle_client(stream: &mut TcpStream, id: u8) {
1685 let mut buf = [0; 128];
1686 let _response = b" END";
1687 while let Ok(sz) = stream.read(&mut buf[..]) {
1688 if sz > 0 {
1689 println!("ECHO[{}] got \"{:?}\"", id, str::from_utf8(&buf[..sz]));
1690 stream.write(&buf[..sz]).unwrap();
1691 }
1692 if TEST_FINISHED.load(Ordering::Relaxed) {
1693 println!("backend server stopping");
1694 break;
1695 }
1696 }
1697 }
1698
1699 let mut count = 0;
1700 thread::spawn(move || {
1701 barrier.wait();
1702 for conn in listener.incoming() {
1703 match conn {
1704 Ok(mut stream) => {
1705 thread::spawn(move || {
1706 println!("got a new client: {count}");
1707 handle_client(&mut stream, count)
1708 });
1709 }
1710 Err(e) => {
1711 println!("connection failed: {e:?}");
1712 }
1713 }
1714 count += 1;
1715 }
1716 });
1717 }
1718
1719 pub fn start_proxy() -> anyhow::Result<Channel<WorkerRequest, WorkerResponse>> {
1721 let config = ListenerBuilder::new_tcp(SocketAddress::new_v4(127, 0, 0, 1, 1234))
1722 .to_tcp(None)
1723 .expect("could not create listener config");
1724
1725 let (mut command, channel) =
1726 Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
1727 let _jg = thread::spawn(move || {
1728 setup_test_logger!();
1729 start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server");
1730 });
1731
1732 command.blocking().unwrap();
1733 {
1734 let front = RequestTcpFrontend {
1735 cluster_id: String::from("yolo"),
1736 address: SocketAddress::new_v4(127, 0, 0, 1, 1234),
1737 ..Default::default()
1738 };
1739 let backend = sozu_command_lib::response::Backend {
1740 cluster_id: String::from("yolo"),
1741 backend_id: String::from("yolo-0"),
1742 address: SocketAddress::new_v4(127, 0, 0, 1, 5678).into(),
1743 load_balancing_parameters: Some(LoadBalancingParams::default()),
1744 sticky_id: None,
1745 backup: None,
1746 };
1747
1748 command
1749 .write_message(&WorkerRequest {
1750 id: String::from("ID_YOLO1"),
1751 content: RequestType::AddTcpFrontend(front).into(),
1752 })
1753 .unwrap();
1754 command
1755 .write_message(&WorkerRequest {
1756 id: String::from("ID_YOLO2"),
1757 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1758 })
1759 .unwrap();
1760 }
1761 {
1762 let front = RequestTcpFrontend {
1763 cluster_id: String::from("yolo"),
1764 address: SocketAddress::new_v4(127, 0, 0, 1, 1235),
1765 ..Default::default()
1766 };
1767 let backend = sozu_command::response::Backend {
1768 cluster_id: String::from("yolo"),
1769 backend_id: String::from("yolo-0"),
1770 address: SocketAddress::new_v4(127, 0, 0, 1, 5678).into(),
1771 load_balancing_parameters: Some(LoadBalancingParams::default()),
1772 sticky_id: None,
1773 backup: None,
1774 };
1775 command
1776 .write_message(&WorkerRequest {
1777 id: String::from("ID_YOLO3"),
1778 content: RequestType::AddTcpFrontend(front).into(),
1779 })
1780 .unwrap();
1781 command
1782 .write_message(&WorkerRequest {
1783 id: String::from("ID_YOLO4"),
1784 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1785 })
1786 .unwrap();
1787 }
1788
1789 for _ in 0..4 {
1791 println!(
1792 "read_message: {:?}",
1793 command
1794 .read_message()
1795 .with_context(|| "could not read message")?
1796 );
1797 }
1798
1799 Ok(command)
1800 }
1801}