1use std::{
2 cell::RefCell,
3 collections::{hash_map::Entry, BTreeMap, HashMap},
4 io::ErrorKind,
5 net::{Shutdown, SocketAddr},
6 os::unix::io::AsRawFd,
7 rc::Rc,
8 time::{Duration, Instant},
9};
10
11use mio::{
12 net::TcpListener as MioTcpListener, net::TcpStream as MioTcpStream, unix::SourceFd, Interest,
13 Registry, Token,
14};
15use rusty_ulid::Ulid;
16
17use sozu_command::{
18 config::MAX_LOOP_ITERATIONS,
19 logging::{EndpointRecord, LogContext},
20 proto::command::request::RequestType,
21 ObjectKind,
22};
23
24use crate::{
25 backends::{Backend, BackendMap},
26 pool::{Checkout, Pool},
27 protocol::{
28 pipe::WebSocketContext,
29 proxy_protocol::{
30 expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
31 },
32 Pipe,
33 },
34 retry::RetryPolicy,
35 server::{push_event, ListenToken, SessionManager, CONN_RETRIES},
36 socket::{server_bind, stats::socket_rtt},
37 sozu_command::{
38 proto::command::{
39 Event, EventKind, ProxyProtocolConfig, RequestTcpFrontend, TcpListenerConfig,
40 WorkerRequest, WorkerResponse,
41 },
42 ready::Ready,
43 state::ClusterId,
44 },
45 timer::TimeoutContainer,
46 AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
47 ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
48 Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
49};
50
51StateMachineBuilder! {
52 enum TcpStateMachine {
57 Pipe(Pipe<MioTcpStream, TcpListener>),
58 SendProxyProtocol(SendProxyProtocol<MioTcpStream>),
59 RelayProxyProtocol(RelayProxyProtocol<MioTcpStream>),
60 ExpectProxyProtocol(ExpectProxyProtocol<MioTcpStream>),
61 }
62}
63
64macro_rules! log_context {
67 ($self:expr) => {
68 format!(
69 "TCP\t{}\tSession(frontend={}, backend={})\t >>>",
70 $self.log_context(),
71 $self.frontend_token.0,
72 $self
73 .backend_token
74 .map(|token| token.0.to_string())
75 .unwrap_or_else(|| "<none>".to_string()),
76 )
77 };
78}
79
80pub struct TcpSession {
81 backend_buffer: Option<Checkout>,
82 backend_connected: BackendConnectionStatus,
83 backend_id: Option<String>,
84 backend_token: Option<Token>,
85 backend: Option<Rc<RefCell<Backend>>>,
86 cluster_id: Option<String>,
87 configured_backend_timeout: Duration,
88 connection_attempt: u8,
89 container_backend_timeout: TimeoutContainer,
90 container_frontend_timeout: TimeoutContainer,
91 frontend_address: Option<SocketAddr>,
92 frontend_buffer: Option<Checkout>,
93 frontend_token: Token,
94 has_been_closed: SessionIsToBeClosed,
95 last_event: Instant,
96 listener: Rc<RefCell<TcpListener>>,
97 metrics: SessionMetrics,
98 proxy: Rc<RefCell<TcpProxy>>,
99 request_id: Ulid,
100 state: TcpStateMachine,
101}
102
103impl TcpSession {
104 #[allow(clippy::too_many_arguments)]
105 fn new(
106 backend_buffer: Checkout,
107 backend_id: Option<String>,
108 cluster_id: Option<String>,
109 configured_backend_timeout: Duration,
110 configured_connect_timeout: Duration,
111 configured_frontend_timeout: Duration,
112 frontend_buffer: Checkout,
113 frontend_token: Token,
114 listener: Rc<RefCell<TcpListener>>,
115 proxy_protocol: Option<ProxyProtocolConfig>,
116 proxy: Rc<RefCell<TcpProxy>>,
117 socket: MioTcpStream,
118 wait_time: Duration,
119 ) -> TcpSession {
120 let frontend_address = socket.peer_addr().ok();
121 let mut frontend_buffer_session = None;
122 let mut backend_buffer_session = None;
123
124 let request_id = Ulid::generate();
125
126 let container_frontend_timeout =
127 TimeoutContainer::new(configured_frontend_timeout, frontend_token);
128 let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout);
129
130 let state = match proxy_protocol {
131 Some(ProxyProtocolConfig::RelayHeader) => {
132 backend_buffer_session = Some(backend_buffer);
133 gauge_add!("protocol.proxy.relay", 1);
134 TcpStateMachine::RelayProxyProtocol(RelayProxyProtocol::new(
135 socket,
136 frontend_token,
137 request_id,
138 None,
139 frontend_buffer,
140 ))
141 }
142 Some(ProxyProtocolConfig::ExpectHeader) => {
143 frontend_buffer_session = Some(frontend_buffer);
144 backend_buffer_session = Some(backend_buffer);
145 gauge_add!("protocol.proxy.expect", 1);
146 TcpStateMachine::ExpectProxyProtocol(ExpectProxyProtocol::new(
147 container_frontend_timeout.clone(),
148 socket,
149 frontend_token,
150 request_id,
151 ))
152 }
153 Some(ProxyProtocolConfig::SendHeader) => {
154 frontend_buffer_session = Some(frontend_buffer);
155 backend_buffer_session = Some(backend_buffer);
156 gauge_add!("protocol.proxy.send", 1);
157 TcpStateMachine::SendProxyProtocol(SendProxyProtocol::new(
158 socket,
159 frontend_token,
160 request_id,
161 None,
162 ))
163 }
164 None => {
165 gauge_add!("protocol.tcp", 1);
166 let mut pipe = Pipe::new(
167 backend_buffer,
168 backend_id.clone(),
169 None,
170 None,
171 None,
172 None,
173 cluster_id.clone(),
174 frontend_buffer,
175 frontend_token,
176 socket,
177 listener.clone(),
178 Protocol::TCP,
179 request_id,
180 frontend_address,
181 WebSocketContext::Tcp,
182 );
183 pipe.set_cluster_id(cluster_id.clone());
184 TcpStateMachine::Pipe(pipe)
185 }
186 };
187
188 let metrics = SessionMetrics::new(Some(wait_time));
189 TcpSession {
192 backend_buffer: backend_buffer_session,
193 backend_connected: BackendConnectionStatus::NotConnected,
194 backend_id,
195 backend_token: None,
196 backend: None,
197 cluster_id,
198 configured_backend_timeout,
199 connection_attempt: 0,
200 container_backend_timeout,
201 container_frontend_timeout,
202 frontend_address,
203 frontend_buffer: frontend_buffer_session,
204 frontend_token,
205 has_been_closed: false,
206 last_event: Instant::now(),
207 listener,
208 metrics,
209 proxy,
210 request_id,
211 state,
212 }
213 }
214
215 fn log_request(&self) {
216 let listener = self.listener.borrow();
217 let context = self.log_context();
218 self.metrics.register_end_of_session(&context);
219 info_access!(
220 on_failure: { incr!("unsent-access-logs") },
221 message: None,
222 context,
223 session_address: self.frontend_address,
224 backend_address: None,
225 protocol: "TCP",
226 endpoint: EndpointRecord::Tcp,
227 tags: listener.get_tags(&listener.get_addr().to_string()),
228 client_rtt: socket_rtt(self.state.front_socket()),
229 server_rtt: None,
230 user_agent: None,
231 service_time: self.metrics.service_time(),
232 response_time: self.metrics.backend_response_time(),
233 request_time: self.metrics.request_time(),
234 bytes_in: self.metrics.bin,
235 bytes_out: self.metrics.bout
236 );
237 }
238
239 fn front_hup(&mut self) -> SessionResult {
240 match &mut self.state {
241 TcpStateMachine::Pipe(pipe) => pipe.frontend_hup(&mut self.metrics),
242 _ => {
243 self.log_request();
244 SessionResult::Close
245 }
246 }
247 }
248
249 fn back_hup(&mut self) -> SessionResult {
250 match &mut self.state {
251 TcpStateMachine::Pipe(pipe) => pipe.backend_hup(&mut self.metrics),
252 _ => {
253 self.log_request();
254 SessionResult::Close
255 }
256 }
257 }
258
259 fn log_context(&self) -> LogContext {
260 LogContext {
261 request_id: self.request_id,
262 cluster_id: self.cluster_id.as_deref(),
263 backend_id: self.backend_id.as_deref(),
264 }
265 }
266
267 fn readable(&mut self) -> SessionResult {
268 if !self.container_frontend_timeout.reset() {
269 error!(
270 "{} Could not reset frontend timeout on readable",
271 log_context!(self)
272 );
273 }
274 if self.backend_connected == BackendConnectionStatus::Connected
275 && !self.container_backend_timeout.reset()
276 {
277 error!(
278 "{} Could not reset backend timeout on readable",
279 log_context!(self)
280 );
281 }
282 match &mut self.state {
283 TcpStateMachine::Pipe(pipe) => pipe.readable(&mut self.metrics),
284 TcpStateMachine::RelayProxyProtocol(pp) => pp.readable(&mut self.metrics),
285 TcpStateMachine::ExpectProxyProtocol(pp) => pp.readable(&mut self.metrics),
286 TcpStateMachine::SendProxyProtocol(_) => SessionResult::Continue,
287 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
288 }
289 }
290
291 fn writable(&mut self) -> SessionResult {
292 match &mut self.state {
293 TcpStateMachine::Pipe(pipe) => pipe.writable(&mut self.metrics),
294 _ => SessionResult::Continue,
295 }
296 }
297
298 fn back_readable(&mut self) -> SessionResult {
299 if !self.container_frontend_timeout.reset() {
300 error!(
301 "{} Could not reset frontend timeout on back_readable",
302 log_context!(self)
303 );
304 }
305 if !self.container_backend_timeout.reset() {
306 error!(
307 "{} Could not reset backend timeout on back_readable",
308 log_context!(self)
309 );
310 }
311
312 match &mut self.state {
313 TcpStateMachine::Pipe(pipe) => pipe.backend_readable(&mut self.metrics),
314 _ => SessionResult::Continue,
315 }
316 }
317
318 fn back_writable(&mut self) -> SessionResult {
319 match &mut self.state {
320 TcpStateMachine::Pipe(pipe) => pipe.backend_writable(&mut self.metrics),
321 TcpStateMachine::RelayProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
322 TcpStateMachine::SendProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
323 TcpStateMachine::ExpectProxyProtocol(_) => SessionResult::Continue,
324 TcpStateMachine::FailedUpgrade(_) => {
325 unreachable!()
326 }
327 }
328 }
329
330 fn back_socket_mut(&mut self) -> Option<&mut MioTcpStream> {
331 match &mut self.state {
332 TcpStateMachine::Pipe(pipe) => pipe.back_socket_mut(),
333 TcpStateMachine::SendProxyProtocol(pp) => pp.back_socket_mut(),
334 TcpStateMachine::RelayProxyProtocol(pp) => pp.back_socket_mut(),
335 TcpStateMachine::ExpectProxyProtocol(_) => None,
336 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
337 }
338 }
339
340 pub fn upgrade(&mut self) -> SessionIsToBeClosed {
341 let new_state = match self.state.take() {
342 TcpStateMachine::SendProxyProtocol(spp) => self.upgrade_send(spp),
343 TcpStateMachine::RelayProxyProtocol(rpp) => self.upgrade_relay(rpp),
344 TcpStateMachine::ExpectProxyProtocol(epp) => self.upgrade_expect(epp),
345 TcpStateMachine::Pipe(_) => None,
346 TcpStateMachine::FailedUpgrade(_) => todo!(),
347 };
348
349 match new_state {
350 Some(state) => {
351 self.state = state;
352 false
353 } None => true,
356 }
357 }
358
359 fn upgrade_send(
360 &mut self,
361 send_proxy_protocol: SendProxyProtocol<MioTcpStream>,
362 ) -> Option<TcpStateMachine> {
363 if self.backend_buffer.is_some() && self.frontend_buffer.is_some() {
364 let mut pipe = send_proxy_protocol.into_pipe(
365 self.frontend_buffer.take().unwrap(),
366 self.backend_buffer.take().unwrap(),
367 self.listener.clone(),
368 );
369
370 pipe.set_cluster_id(self.cluster_id.clone());
371 gauge_add!("protocol.proxy.send", -1);
372 gauge_add!("protocol.tcp", 1);
373 return Some(TcpStateMachine::Pipe(pipe));
374 }
375
376 error!(
377 "{} Missing the frontend or backend buffer queue, we can't switch to a pipe",
378 log_context!(self)
379 );
380 None
381 }
382
383 fn upgrade_relay(&mut self, rpp: RelayProxyProtocol<MioTcpStream>) -> Option<TcpStateMachine> {
384 if self.backend_buffer.is_some() {
385 let mut pipe =
386 rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone());
387 pipe.set_cluster_id(self.cluster_id.clone());
388 gauge_add!("protocol.proxy.relay", -1);
389 gauge_add!("protocol.tcp", 1);
390 return Some(TcpStateMachine::Pipe(pipe));
391 }
392
393 error!(
394 "{} Missing the backend buffer queue, we can't switch to a pipe",
395 log_context!(self)
396 );
397 None
398 }
399
400 fn upgrade_expect(
401 &mut self,
402 epp: ExpectProxyProtocol<MioTcpStream>,
403 ) -> Option<TcpStateMachine> {
404 if self.frontend_buffer.is_some() && self.backend_buffer.is_some() {
405 let mut pipe = epp.into_pipe(
406 self.frontend_buffer.take().unwrap(),
407 self.backend_buffer.take().unwrap(),
408 None,
409 None,
410 self.listener.clone(),
411 );
412
413 pipe.set_cluster_id(self.cluster_id.clone());
414 gauge_add!("protocol.proxy.expect", -1);
415 gauge_add!("protocol.tcp", 1);
416 return Some(TcpStateMachine::Pipe(pipe));
417 }
418
419 error!(
420 "{} Missing the backend buffer queue, we can't switch to a pipe",
421 log_context!(self)
422 );
423 None
424 }
425
426 fn front_readiness(&mut self) -> &mut Readiness {
427 match &mut self.state {
428 TcpStateMachine::Pipe(pipe) => &mut pipe.frontend_readiness,
429 TcpStateMachine::SendProxyProtocol(pp) => &mut pp.frontend_readiness,
430 TcpStateMachine::RelayProxyProtocol(pp) => &mut pp.frontend_readiness,
431 TcpStateMachine::ExpectProxyProtocol(pp) => &mut pp.frontend_readiness,
432 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
433 }
434 }
435
436 fn back_readiness(&mut self) -> Option<&mut Readiness> {
437 match &mut self.state {
438 TcpStateMachine::Pipe(pipe) => Some(&mut pipe.backend_readiness),
439 TcpStateMachine::SendProxyProtocol(pp) => Some(&mut pp.backend_readiness),
440 TcpStateMachine::RelayProxyProtocol(pp) => Some(&mut pp.backend_readiness),
441 TcpStateMachine::ExpectProxyProtocol(_) => None,
442 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
443 }
444 }
445
446 fn set_back_socket(&mut self, socket: MioTcpStream) {
447 match &mut self.state {
448 TcpStateMachine::Pipe(pipe) => pipe.set_back_socket(socket),
449 TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_socket(socket),
450 TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_socket(socket),
451 TcpStateMachine::ExpectProxyProtocol(_) => {
452 error!(
453 "{} We should not set the back socket for the expect proxy protocol",
454 log_context!(self)
455 );
456 panic!(
457 "{} We should not set the back socket for the expect proxy protocol",
458 log_context!(self)
459 );
460 }
461 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
462 }
463 }
464
465 fn set_back_token(&mut self, token: Token) {
466 self.backend_token = Some(token);
467
468 match &mut self.state {
469 TcpStateMachine::Pipe(pipe) => pipe.set_back_token(token),
470 TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_token(token),
471 TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_token(token),
472 TcpStateMachine::ExpectProxyProtocol(_) => self.backend_token = Some(token),
473 TcpStateMachine::FailedUpgrade(_) => unreachable!(),
474 }
475 }
476
477 fn set_backend_id(&mut self, id: String) {
478 self.backend_id = Some(id.clone());
479 if let TcpStateMachine::Pipe(pipe) = &mut self.state {
480 pipe.set_backend_id(Some(id));
481 }
482 }
483
484 fn back_connected(&self) -> BackendConnectionStatus {
485 self.backend_connected
486 }
487
488 fn set_back_connected(&mut self, status: BackendConnectionStatus) {
489 let last = self.backend_connected;
490 self.backend_connected = status;
491
492 if status == BackendConnectionStatus::Connected {
493 gauge_add!("backend.connections", 1);
494 gauge_add!(
495 "connections_per_backend",
496 1,
497 self.cluster_id.as_deref(),
498 self.metrics.backend_id.as_deref()
499 );
500
501 self.container_backend_timeout
504 .set_duration(self.configured_backend_timeout);
505 self.container_frontend_timeout.reset();
506
507 if let TcpStateMachine::SendProxyProtocol(spp) = &mut self.state {
508 spp.set_back_connected(BackendConnectionStatus::Connected);
509 }
510
511 if let Some(backend) = self.backend.as_ref() {
512 let mut backend = backend.borrow_mut();
513
514 if backend.retry_policy.is_down() {
515 incr!(
516 "backend.up",
517 self.cluster_id.as_deref(),
518 self.metrics.backend_id.as_deref()
519 );
520 info!(
521 "backend server {} at {} is up",
522 backend.backend_id, backend.address
523 );
524 push_event(Event {
525 kind: EventKind::BackendUp as i32,
526 backend_id: Some(backend.backend_id.to_owned()),
527 address: Some(backend.address.into()),
528 cluster_id: None,
529 });
530 }
531
532 if let BackendConnectionStatus::Connecting(start) = last {
533 backend.set_connection_time(Instant::now() - start);
534 }
535
536 backend.failures = 0;
538 backend.retry_policy.succeed();
539 }
540 }
541 }
542
543 fn remove_backend(&mut self) {
544 if let Some(backend) = self.backend.take() {
545 (*backend.borrow_mut()).dec_connections();
546 }
547
548 self.backend_token = None;
549 }
550
551 fn fail_backend_connection(&mut self) {
552 if let Some(backend) = self.backend.as_ref() {
553 let backend = &mut *backend.borrow_mut();
554 backend.failures += 1;
555
556 let already_unavailable = backend.retry_policy.is_down();
557 backend.retry_policy.fail();
558 incr!(
559 "backend.connections.error",
560 self.cluster_id.as_deref(),
561 self.metrics.backend_id.as_deref()
562 );
563 if !already_unavailable && backend.retry_policy.is_down() {
564 error!(
565 "backend server {} at {} is down",
566 backend.backend_id, backend.address
567 );
568 incr!(
569 "backend.down",
570 self.cluster_id.as_deref(),
571 self.metrics.backend_id.as_deref()
572 );
573
574 push_event(Event {
575 kind: EventKind::BackendDown as i32,
576 backend_id: Some(backend.backend_id.to_owned()),
577 address: Some(backend.address.into()),
578 cluster_id: None,
579 });
580 }
581 }
582 }
583
584 pub fn test_back_socket(&mut self) -> SessionIsToBeClosed {
585 match self.back_socket_mut() {
586 Some(ref mut s) => {
587 let mut tmp = [0u8; 1];
588 let res = s.peek(&mut tmp[..]);
589
590 match res {
591 Ok(0) => false,
593 Ok(_) => true,
594 Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
595 }
596 }
597 None => false,
598 }
599 }
600
601 pub fn cancel_timeouts(&mut self) {
602 self.container_frontend_timeout.cancel();
603 self.container_backend_timeout.cancel();
604 }
605
606 fn ready_inner(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionResult {
607 let mut counter = 0;
608
609 let back_connected = self.back_connected();
610 if back_connected.is_connecting() {
611 if self.back_readiness().unwrap().event.is_hup() && !self.test_back_socket() {
612 debug!("error connecting to backend, trying again");
614 self.connection_attempt += 1;
615 self.fail_backend_connection();
616
617 self.close_backend();
619 let connection_result = self.connect_to_backend(session.clone());
620 if let Err(err) = &connection_result {
621 error!(
622 "{} Error connecting to backend: {}",
623 log_context!(self),
624 err
625 );
626 }
627
628 if let Some(state_result) = handle_connection_result(connection_result) {
629 return state_result;
630 }
631 } else if self.back_readiness().unwrap().event != Ready::EMPTY {
632 self.connection_attempt = 0;
633 self.set_back_connected(BackendConnectionStatus::Connected);
634 }
635 } else if back_connected == BackendConnectionStatus::NotConnected {
636 let connection_result = self.connect_to_backend(session.clone());
637 if let Err(err) = &connection_result {
638 error!(
639 "{} Error connecting to backend: {}",
640 log_context!(self),
641 err
642 );
643 }
644
645 if let Some(state_result) = handle_connection_result(connection_result) {
646 return state_result;
647 }
648 }
649
650 if self.front_readiness().event.is_hup() {
651 let session_result = self.front_hup();
652 if session_result == SessionResult::Continue {
653 self.front_readiness().event.remove(Ready::HUP);
654 }
655 return session_result;
656 }
657
658 while counter < MAX_LOOP_ITERATIONS {
659 let front_interest = self.front_readiness().interest & self.front_readiness().event;
660 let back_interest = self
661 .back_readiness()
662 .map(|r| r.interest & r.event)
663 .unwrap_or(Ready::EMPTY);
664
665 trace!(
666 "{} Frontend interest({:?}) and backend interest({:?})",
667 log_context!(self),
668 front_interest,
669 back_interest
670 );
671
672 if front_interest == Ready::EMPTY && back_interest == Ready::EMPTY {
673 break;
674 }
675
676 if self
677 .back_readiness()
678 .map(|r| r.event.is_hup())
679 .unwrap_or(false)
680 && self.front_readiness().interest.is_writable()
681 && !self.front_readiness().event.is_writable()
682 {
683 break;
684 }
685
686 if front_interest.is_readable() {
687 let session_result = self.readable();
688 if session_result != SessionResult::Continue {
689 return session_result;
690 }
691 }
692
693 if back_interest.is_writable() {
694 let session_result = self.back_writable();
695 if session_result != SessionResult::Continue {
696 return session_result;
697 }
698 }
699
700 if back_interest.is_readable() {
701 let session_result = self.back_readable();
702 if session_result != SessionResult::Continue {
703 return session_result;
704 }
705 }
706
707 if front_interest.is_writable() {
708 let session_result = self.writable();
709 if session_result != SessionResult::Continue {
710 return session_result;
711 }
712 }
713
714 if back_interest.is_hup() {
715 let session_result = self.back_hup();
716 if session_result != SessionResult::Continue {
717 return session_result;
718 }
719 }
720
721 if front_interest.is_error() {
722 error!(
723 "{} Frontend socket error, disconnecting",
724 log_context!(self)
725 );
726 self.front_readiness().interest = Ready::EMPTY;
727 if let Some(r) = self.back_readiness() {
728 r.interest = Ready::EMPTY;
729 }
730
731 return SessionResult::Close;
732 }
733
734 if back_interest.is_error() && self.back_hup() == SessionResult::Close {
735 self.front_readiness().interest = Ready::EMPTY;
736 if let Some(r) = self.back_readiness() {
737 r.interest = Ready::EMPTY;
738 }
739
740 error!("{} backend socket error, disconnecting", log_context!(self));
741 return SessionResult::Close;
742 }
743
744 counter += 1;
745 }
746
747 if counter >= MAX_LOOP_ITERATIONS {
748 error!(
749 "{} Handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
750 log_context!(self), MAX_LOOP_ITERATIONS
751 );
752
753 incr!("tcp.infinite_loop.error");
754
755 let front_interest = self.front_readiness().interest & self.front_readiness().event;
756 let back_interest = self
757 .back_readiness()
758 .map(|r| r.interest & r.event)
759 .unwrap_or(Ready::EMPTY);
760
761 let back = self.back_readiness().cloned();
762
763 error!(
764 "{} readiness: front {:?} / back {:?} | front: {:?} | back: {:?} ",
765 log_context!(self),
766 self.front_readiness(),
767 back,
768 front_interest,
769 back_interest
770 );
771
772 self.print_session();
773
774 return SessionResult::Close;
775 }
776
777 SessionResult::Continue
778 }
779
780 fn close_backend(&mut self) {
782 if let (Some(token), Some(fd)) = (
783 self.backend_token,
784 self.back_socket_mut().map(|s| s.as_raw_fd()),
785 ) {
786 let proxy = self.proxy.borrow();
787 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
788 error!(
789 "{} Error deregistering socket({:?}): {:?}",
790 log_context!(self),
791 fd,
792 e
793 );
794 }
795
796 proxy.sessions.borrow_mut().slab.try_remove(token.0);
797 }
798 self.remove_backend();
799
800 let back_connected = self.back_connected();
801 if back_connected != BackendConnectionStatus::NotConnected {
802 if let Some(r) = self.back_readiness() {
803 r.event = Ready::EMPTY;
804 }
805
806 let log_context = log_context!(self);
807 if let Some(sock) = self.back_socket_mut() {
808 if let Err(e) = sock.shutdown(Shutdown::Both) {
809 if e.kind() != ErrorKind::NotConnected {
810 error!(
811 "{} Error closing back socket({:?}): {:?}",
812 log_context, sock, e
813 );
814 }
815 }
816 }
817 }
818
819 if back_connected == BackendConnectionStatus::Connected {
820 gauge_add!("backend.connections", -1);
821 gauge_add!(
822 "connections_per_backend",
823 -1,
824 self.cluster_id.as_deref(),
825 self.metrics.backend_id.as_deref()
826 );
827 }
828
829 self.set_back_connected(BackendConnectionStatus::NotConnected);
830 }
831
832 fn connect_to_backend(
833 &mut self,
834 session_rc: Rc<RefCell<dyn ProxySession>>,
835 ) -> Result<BackendConnectAction, BackendConnectionError> {
836 let cluster_id = self
837 .listener
838 .borrow()
839 .cluster_id
840 .clone()
841 .ok_or(BackendConnectionError::NotFound(ObjectKind::TcpCluster))?;
842
843 self.cluster_id = Some(cluster_id.clone());
844
845 if self.connection_attempt >= CONN_RETRIES {
846 error!(
847 "{} Max connection attempt reached ({})",
848 log_context!(self),
849 self.connection_attempt
850 );
851 return Err(BackendConnectionError::MaxConnectionRetries(Some(
852 cluster_id,
853 )));
854 }
855
856 if self.proxy.borrow().sessions.borrow().at_capacity() {
857 return Err(BackendConnectionError::MaxSessionsMemory);
858 }
859
860 let (backend, mut stream) = self
861 .proxy
862 .borrow()
863 .backends
864 .borrow_mut()
865 .backend_from_cluster_id(&cluster_id)
866 .map_err(BackendConnectionError::Backend)?;
867
868 if let Err(e) = stream.set_nodelay(true) {
869 error!(
870 "{} Error setting nodelay on back socket({:?}): {:?}",
871 log_context!(self),
872 stream,
873 e
874 );
875 }
876 self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());
877
878 let back_token = {
879 let proxy = self.proxy.borrow();
880 let mut s = proxy.sessions.borrow_mut();
881 let entry = s.slab.vacant_entry();
882 let back_token = Token(entry.key());
883 let _entry = entry.insert(session_rc.clone());
884 back_token
885 };
886
887 if let Err(e) = self.proxy.borrow().registry.register(
888 &mut stream,
889 back_token,
890 Interest::READABLE | Interest::WRITABLE,
891 ) {
892 error!(
893 "{} Error registering back socket({:?}): {:?}",
894 log_context!(self),
895 stream,
896 e
897 );
898 }
899
900 self.container_backend_timeout.set(back_token);
901
902 self.set_back_token(back_token);
903 self.set_back_socket(stream);
904
905 self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
906 self.metrics.backend_start();
907 self.set_backend_id(backend.borrow().backend_id.clone());
908
909 Ok(BackendConnectAction::New)
910 }
911}
912
913impl ProxySession for TcpSession {
914 fn close(&mut self) {
915 if self.has_been_closed {
916 return;
917 }
918
919 trace!("{} Closing TCP session", log_context!(self));
921 self.metrics.service_stop();
922
923 match self.state.marker() {
925 StateMarker::Pipe => gauge_add!("protocol.tcp", -1),
926 StateMarker::SendProxyProtocol => gauge_add!("protocol.proxy.send", -1),
927 StateMarker::RelayProxyProtocol => gauge_add!("protocol.proxy.relay", -1),
928 StateMarker::ExpectProxyProtocol => gauge_add!("protocol.proxy.expect", -1),
929 }
930
931 if self.state.failed() {
932 match self.state.marker() {
933 StateMarker::Pipe => incr!("tcp.upgrade.pipe.failed"),
934 StateMarker::SendProxyProtocol => incr!("tcp.upgrade.send.failed"),
935 StateMarker::RelayProxyProtocol => incr!("tcp.upgrade.relay.failed"),
936 StateMarker::ExpectProxyProtocol => incr!("tcp.upgrade.expect.failed"),
937 }
938 return;
939 }
940
941 self.cancel_timeouts();
942
943 let front_socket = self.state.front_socket();
944 if let Err(e) = front_socket.shutdown(Shutdown::Both) {
945 if e.kind() != ErrorKind::NotConnected {
947 error!(
948 "{} Error shutting down front socket({:?}): {:?}",
949 log_context!(self),
950 front_socket,
951 e
952 );
953 }
954 }
955
956 {
958 let proxy = self.proxy.borrow();
959 let fd = front_socket.as_raw_fd();
960 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
961 error!(
962 "{} Error deregistering front socket({:?}) while closing TCP session: {:?}",
963 log_context!(self),
964 fd,
965 e
966 );
967 }
968 proxy
969 .sessions
970 .borrow_mut()
971 .slab
972 .try_remove(self.frontend_token.0);
973 }
974
975 self.close_backend();
976 self.has_been_closed = true;
977 }
978
979 fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
980 if self.frontend_token == token {
981 self.container_frontend_timeout.triggered();
982 return true;
983 }
984 if self.backend_token == Some(token) {
985 self.container_backend_timeout.triggered();
986 return true;
987 }
988 false
990 }
991
992 fn protocol(&self) -> Protocol {
993 Protocol::TCP
994 }
995
996 fn update_readiness(&mut self, token: Token, events: Ready) {
997 trace!(
998 "{} token {:?} got event {}",
999 log_context!(self),
1000 token,
1001 super::ready_to_string(events)
1002 );
1003
1004 self.last_event = Instant::now();
1005 self.metrics.wait_start();
1006
1007 if self.frontend_token == token {
1008 self.front_readiness().event = self.front_readiness().event | events;
1009 } else if self.backend_token == Some(token) {
1010 if let Some(r) = self.back_readiness() {
1011 r.event |= events;
1012 }
1013 }
1014 }
1015
1016 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1017 self.metrics.service_start();
1018
1019 let session_result = self.ready_inner(session.clone());
1020
1021 let to_bo_closed = match session_result {
1022 SessionResult::Close => true,
1023 SessionResult::Continue => false,
1024 SessionResult::Upgrade => match self.upgrade() {
1025 false => self.ready(session),
1026 true => true,
1027 },
1028 };
1029
1030 self.metrics.service_stop();
1031 to_bo_closed
1032 }
1033
1034 fn shutting_down(&mut self) -> SessionIsToBeClosed {
1035 true
1036 }
1037
1038 fn last_event(&self) -> Instant {
1039 self.last_event
1040 }
1041
1042 fn print_session(&self) {
1043 let state: String = match &self.state {
1044 TcpStateMachine::ExpectProxyProtocol(_) => String::from("Expect"),
1045 TcpStateMachine::SendProxyProtocol(_) => String::from("Send"),
1046 TcpStateMachine::RelayProxyProtocol(_) => String::from("Relay"),
1047 TcpStateMachine::Pipe(_) => String::from("TCP"),
1048 TcpStateMachine::FailedUpgrade(marker) => format!("FailedUpgrade({marker:?})"),
1049 };
1050
1051 let front_readiness = match &self.state {
1052 TcpStateMachine::ExpectProxyProtocol(expect) => Some(&expect.frontend_readiness),
1053 TcpStateMachine::SendProxyProtocol(send) => Some(&send.frontend_readiness),
1054 TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.frontend_readiness),
1055 TcpStateMachine::Pipe(pipe) => Some(&pipe.frontend_readiness),
1056 TcpStateMachine::FailedUpgrade(_) => None,
1057 };
1058
1059 let back_readiness = match &self.state {
1060 TcpStateMachine::SendProxyProtocol(send) => Some(&send.backend_readiness),
1061 TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.backend_readiness),
1062 TcpStateMachine::Pipe(pipe) => Some(&pipe.backend_readiness),
1063 TcpStateMachine::ExpectProxyProtocol(_) => None,
1064 TcpStateMachine::FailedUpgrade(_) => None,
1065 };
1066
1067 error!(
1068 "\
1069{} Session ({:?})
1070\tFrontend:
1071\t\ttoken: {:?}\treadiness: {:?}
1072\tBackend:
1073\t\ttoken: {:?}\treadiness: {:?}\tstatus: {:?}\tcluster id: {:?}",
1074 log_context!(self),
1075 state,
1076 self.frontend_token,
1077 front_readiness,
1078 self.backend_token,
1079 back_readiness,
1080 self.backend_connected,
1081 self.cluster_id
1082 );
1083 error!("Metrics: {:?}", self.metrics);
1084 }
1085
1086 fn frontend_token(&self) -> Token {
1087 self.frontend_token
1088 }
1089}
1090
1091pub struct TcpListener {
1092 active: SessionIsToBeClosed,
1093 address: SocketAddr,
1094 cluster_id: Option<String>,
1095 config: TcpListenerConfig,
1096 listener: Option<MioTcpListener>,
1097 tags: BTreeMap<String, CachedTags>,
1098 token: Token,
1099}
1100
1101impl ListenerHandler for TcpListener {
1102 fn get_addr(&self) -> &SocketAddr {
1103 &self.address
1104 }
1105
1106 fn get_tags(&self, key: &str) -> Option<&CachedTags> {
1107 self.tags.get(key)
1108 }
1109
1110 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
1111 match tags {
1112 Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
1113 None => self.tags.remove(&key),
1114 };
1115 }
1116}
1117
1118impl TcpListener {
1119 fn new(config: TcpListenerConfig, token: Token) -> Result<TcpListener, ListenerError> {
1120 Ok(TcpListener {
1121 cluster_id: None,
1122 listener: None,
1123 token,
1124 address: config.address.into(),
1125 config,
1126 active: false,
1127 tags: BTreeMap::new(),
1128 })
1129 }
1130
1131 pub fn activate(
1132 &mut self,
1133 registry: &Registry,
1134 tcp_listener: Option<MioTcpListener>,
1135 ) -> Result<Token, ProxyError> {
1136 if self.active {
1137 return Ok(self.token);
1138 }
1139
1140 let mut listener = match tcp_listener {
1141 Some(listener) => listener,
1142 None => {
1143 let address = self.config.address.into();
1144 server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
1145 }
1146 };
1147
1148 registry
1149 .register(&mut listener, self.token, Interest::READABLE)
1150 .map_err(ProxyError::RegisterListener)?;
1151
1152 self.listener = Some(listener);
1153 self.active = true;
1154 Ok(self.token)
1155 }
1156}
1157
1158fn handle_connection_result(
1159 connection_result: Result<BackendConnectAction, BackendConnectionError>,
1160) -> Option<SessionResult> {
1161 match connection_result {
1162 Ok(BackendConnectAction::Reuse) => None,
1164 Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
1165 Some(SessionResult::Continue)
1167 }
1168 Err(_) => {
1169 Some(SessionResult::Close)
1172 }
1173 }
1174}
1175
1176#[derive(Debug)]
1177pub struct ClusterConfiguration {
1178 proxy_protocol: Option<ProxyProtocolConfig>,
1179 }
1182
1183pub struct TcpProxy {
1184 fronts: HashMap<String, Token>,
1185 backends: Rc<RefCell<BackendMap>>,
1186 listeners: HashMap<Token, Rc<RefCell<TcpListener>>>,
1187 configs: HashMap<ClusterId, ClusterConfiguration>,
1188 registry: Registry,
1189 sessions: Rc<RefCell<SessionManager>>,
1190 pool: Rc<RefCell<Pool>>,
1191}
1192
1193impl TcpProxy {
1194 pub fn new(
1195 registry: Registry,
1196 sessions: Rc<RefCell<SessionManager>>,
1197 pool: Rc<RefCell<Pool>>,
1198 backends: Rc<RefCell<BackendMap>>,
1199 ) -> TcpProxy {
1200 TcpProxy {
1201 backends,
1202 listeners: HashMap::new(),
1203 configs: HashMap::new(),
1204 fronts: HashMap::new(),
1205 registry,
1206 sessions,
1207 pool,
1208 }
1209 }
1210
1211 pub fn add_listener(
1212 &mut self,
1213 config: TcpListenerConfig,
1214 token: Token,
1215 ) -> Result<Token, ProxyError> {
1216 match self.listeners.entry(token) {
1217 Entry::Vacant(entry) => {
1218 let tcp_listener =
1219 TcpListener::new(config, token).map_err(ProxyError::AddListener)?;
1220 entry.insert(Rc::new(RefCell::new(tcp_listener)));
1221 Ok(token)
1222 }
1223 _ => Err(ProxyError::ListenerAlreadyPresent),
1224 }
1225 }
1226
1227 pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
1228 let len = self.listeners.len();
1229
1230 self.listeners.retain(|_, l| l.borrow().address != address);
1231 self.listeners.len() < len
1232 }
1233
1234 pub fn activate_listener(
1235 &self,
1236 addr: &SocketAddr,
1237 tcp_listener: Option<MioTcpListener>,
1238 ) -> Result<Token, ProxyError> {
1239 let listener = self
1240 .listeners
1241 .values()
1242 .find(|listener| listener.borrow().address == *addr)
1243 .ok_or(ProxyError::NoListenerFound(*addr))?;
1244
1245 listener.borrow_mut().activate(&self.registry, tcp_listener)
1246 }
1247
1248 pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1249 self.listeners
1250 .values()
1251 .filter_map(|listener| {
1252 let mut owned = listener.borrow_mut();
1253 if let Some(listener) = owned.listener.take() {
1254 return Some((owned.address, listener));
1255 }
1256
1257 None
1258 })
1259 .collect()
1260 }
1261
1262 pub fn give_back_listener(
1263 &mut self,
1264 address: SocketAddr,
1265 ) -> Result<(Token, MioTcpListener), ProxyError> {
1266 let listener = self
1267 .listeners
1268 .values()
1269 .find(|listener| listener.borrow().address == address)
1270 .ok_or(ProxyError::NoListenerFound(address))?;
1271
1272 let mut owned = listener.borrow_mut();
1273
1274 let taken_listener = owned
1275 .listener
1276 .take()
1277 .ok_or(ProxyError::UnactivatedListener)?;
1278
1279 Ok((owned.token, taken_listener))
1280 }
1281
1282 pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1283 let address = front.address.into();
1284
1285 let mut listener = self
1286 .listeners
1287 .values()
1288 .find(|l| l.borrow().address == address)
1289 .ok_or(ProxyError::NoListenerFound(address))?
1290 .borrow_mut();
1291
1292 self.fronts
1293 .insert(front.cluster_id.to_string(), listener.token);
1294 listener.set_tags(address.to_string(), Some(front.tags));
1295 listener.cluster_id = Some(front.cluster_id);
1296 Ok(())
1297 }
1298
1299 pub fn remove_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
1300 let address = front.address.into();
1301
1302 let mut listener = match self
1303 .listeners
1304 .values()
1305 .find(|l| l.borrow().address == address)
1306 {
1307 Some(l) => l.borrow_mut(),
1308 None => return Err(ProxyError::NoListenerFound(address)),
1309 };
1310
1311 listener.set_tags(address.to_string(), None);
1312 if let Some(cluster_id) = listener.cluster_id.take() {
1313 self.fronts.remove(&cluster_id);
1314 }
1315 Ok(())
1316 }
1317}
1318
1319impl ProxyConfiguration for TcpProxy {
1320 fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
1321 let request_type = match message.content.request_type {
1322 Some(t) => t,
1323 None => return WorkerResponse::error(message.id, "Empty request"),
1324 };
1325 match request_type {
1326 RequestType::AddTcpFrontend(front) => {
1327 if let Err(err) = self.add_tcp_front(front) {
1328 return WorkerResponse::error(message.id, err);
1329 }
1330
1331 WorkerResponse::ok(message.id)
1332 }
1333 RequestType::RemoveTcpFrontend(front) => {
1334 if let Err(err) = self.remove_tcp_front(front) {
1335 return WorkerResponse::error(message.id, err);
1336 }
1337
1338 WorkerResponse::ok(message.id)
1339 }
1340 RequestType::SoftStop(_) => {
1341 info!("{} processing soft shutdown", message.id);
1342 let listeners: HashMap<_, _> = self.listeners.drain().collect();
1343 for (_, l) in listeners.iter() {
1344 l.borrow_mut()
1345 .listener
1346 .take()
1347 .map(|mut sock| self.registry.deregister(&mut sock));
1348 }
1349 WorkerResponse::processing(message.id)
1350 }
1351 RequestType::HardStop(_) => {
1352 info!("{} hard shutdown", message.id);
1353 let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1354 for (_, l) in listeners.drain() {
1355 l.borrow_mut()
1356 .listener
1357 .take()
1358 .map(|mut sock| self.registry.deregister(&mut sock));
1359 }
1360 WorkerResponse::ok(message.id)
1361 }
1362 RequestType::Status(_) => {
1363 info!("{} status", message.id);
1364 WorkerResponse::ok(message.id)
1365 }
1366 RequestType::AddCluster(cluster) => {
1367 let config = ClusterConfiguration {
1368 proxy_protocol: cluster
1369 .proxy_protocol
1370 .and_then(|n| ProxyProtocolConfig::try_from(n).ok()),
1371 };
1373 self.configs.insert(cluster.cluster_id, config);
1374 WorkerResponse::ok(message.id)
1375 }
1376 RequestType::RemoveCluster(cluster_id) => {
1377 self.configs.remove(&cluster_id);
1378 WorkerResponse::ok(message.id)
1379 }
1380 RequestType::RemoveListener(remove) => {
1381 if !self.remove_listener(remove.address.into()) {
1382 WorkerResponse::error(
1383 message.id,
1384 format!("no TCP listener to remove at address {:?}", remove.address),
1385 )
1386 } else {
1387 WorkerResponse::ok(message.id)
1388 }
1389 }
1390 command => {
1391 debug!(
1392 "{} unsupported message for TCP proxy, ignoring {:?}",
1393 message.id, command
1394 );
1395 WorkerResponse::error(message.id, "unsupported message")
1396 }
1397 }
1398 }
1399
1400 fn accept(&mut self, token: ListenToken) -> Result<MioTcpStream, AcceptError> {
1401 let internal_token = Token(token.0);
1402 if let Some(listener) = self.listeners.get(&internal_token) {
1403 if let Some(tcp_listener) = &listener.borrow().listener {
1404 tcp_listener
1405 .accept()
1406 .map(|(frontend_sock, _)| frontend_sock)
1407 .map_err(|e| match e.kind() {
1408 ErrorKind::WouldBlock => AcceptError::WouldBlock,
1409 _ => {
1410 error!("accept() IO error: {:?}", e);
1411 AcceptError::IoError
1412 }
1413 })
1414 } else {
1415 Err(AcceptError::IoError)
1416 }
1417 } else {
1418 Err(AcceptError::IoError)
1419 }
1420 }
1421
1422 fn create_session(
1423 &mut self,
1424 mut frontend_sock: MioTcpStream,
1425 token: ListenToken,
1426 wait_time: Duration,
1427 proxy: Rc<RefCell<Self>>,
1428 ) -> Result<(), AcceptError> {
1429 let listener_token = Token(token.0);
1430
1431 let listener = self
1432 .listeners
1433 .get(&listener_token)
1434 .ok_or(AcceptError::IoError)?;
1435
1436 let owned = listener.borrow();
1437 let mut pool = self.pool.borrow_mut();
1438
1439 let (front_buffer, back_buffer) = match (pool.checkout(), pool.checkout()) {
1440 (Some(fb), Some(bb)) => (fb, bb),
1441 _ => {
1442 error!("could not get buffers from pool");
1443 error!(
1444 "Buffer capacity has been reached, stopping to accept new connections for now"
1445 );
1446 gauge!("accept_queue.backpressure", 1);
1447 self.sessions.borrow_mut().can_accept = false;
1448
1449 return Err(AcceptError::BufferCapacityReached);
1450 }
1451 };
1452
1453 if owned.cluster_id.is_none() {
1454 error!(
1455 "listener at address {:?} has no linked cluster",
1456 owned.address
1457 );
1458 return Err(AcceptError::IoError);
1459 }
1460
1461 let proxy_protocol = self
1462 .configs
1463 .get(owned.cluster_id.as_ref().unwrap())
1464 .and_then(|c| c.proxy_protocol);
1465
1466 if let Err(e) = frontend_sock.set_nodelay(true) {
1467 error!(
1468 "error setting nodelay on front socket({:?}): {:?}",
1469 frontend_sock, e
1470 );
1471 }
1472
1473 let mut session_manager = self.sessions.borrow_mut();
1474 let entry = session_manager.slab.vacant_entry();
1475 let frontend_token = Token(entry.key());
1476
1477 if let Err(register_error) = self.registry.register(
1478 &mut frontend_sock,
1479 frontend_token,
1480 Interest::READABLE | Interest::WRITABLE,
1481 ) {
1482 error!(
1483 "error registering front socket({:?}): {:?}",
1484 frontend_sock, register_error
1485 );
1486 return Err(AcceptError::RegisterError);
1487 }
1488
1489 let session = TcpSession::new(
1490 back_buffer,
1491 None,
1492 owned.cluster_id.clone(),
1493 Duration::from_secs(owned.config.back_timeout as u64),
1494 Duration::from_secs(owned.config.connect_timeout as u64),
1495 Duration::from_secs(owned.config.front_timeout as u64),
1496 front_buffer,
1497 frontend_token,
1498 listener.clone(),
1499 proxy_protocol,
1500 proxy,
1501 frontend_sock,
1502 wait_time,
1503 );
1504 incr!("tcp.requests");
1505
1506 let session = Rc::new(RefCell::new(session));
1507 entry.insert(session);
1508
1509 Ok(())
1510 }
1511}
1512
1513pub mod testing {
1514 use crate::testing::*;
1515
1516 pub fn start_tcp_worker(
1518 config: TcpListenerConfig,
1519 max_buffers: usize,
1520 buffer_size: usize,
1521 channel: ProxyChannel,
1522 ) -> anyhow::Result<()> {
1523 let address = config.address.into();
1524
1525 let ServerParts {
1526 event_loop,
1527 registry,
1528 sessions,
1529 pool,
1530 backends,
1531 client_scm_socket: _,
1532 server_scm_socket,
1533 server_config,
1534 } = prebuild_server(max_buffers, buffer_size, true)?;
1535
1536 let token = {
1537 let mut sessions = sessions.borrow_mut();
1538 let entry = sessions.slab.vacant_entry();
1539 let key = entry.key();
1540 let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1541 protocol: Protocol::TCPListen,
1542 })));
1543 Token(key)
1544 };
1545
1546 let mut proxy = TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1547 proxy
1548 .add_listener(config, token)
1549 .with_context(|| "Failed at creating adding the listener")?;
1550 proxy
1551 .activate_listener(&address, None)
1552 .with_context(|| "Failed at creating activating the listener")?;
1553
1554 let mut server = Server::new(
1555 event_loop,
1556 channel,
1557 server_scm_socket,
1558 sessions,
1559 pool,
1560 backends,
1561 None,
1562 None,
1563 Some(proxy),
1564 server_config,
1565 None,
1566 false,
1567 )
1568 .with_context(|| "Failed at creating server")?;
1569
1570 debug!("starting event loop");
1571 server.run();
1572 debug!("ending event loop");
1573 Ok(())
1574 }
1575}
1576
1577#[cfg(test)]
1578mod tests {
1579 use super::testing::start_tcp_worker;
1580 use crate::testing::*;
1581
1582 use sozu_command::proto::command::SocketAddress;
1583 use std::{
1584 io::{Read, Write},
1585 net::{Shutdown, TcpListener, TcpStream},
1586 str,
1587 sync::{
1588 atomic::{AtomicBool, Ordering},
1589 Arc, Barrier,
1590 },
1591 thread,
1592 };
1593
1594 use sozu_command::{
1595 channel::Channel,
1596 config::ListenerBuilder,
1597 proto::command::{
1598 request::RequestType, LoadBalancingParams, RequestTcpFrontend, WorkerRequest,
1599 WorkerResponse,
1600 },
1601 };
1602 static TEST_FINISHED: AtomicBool = AtomicBool::new(false);
1603
1604 #[test]
1618 fn round_trip() {
1619 setup_test_logger!();
1620 let barrier = Arc::new(Barrier::new(2));
1621 start_server(barrier.clone());
1622 let _tx = start_proxy().expect("Could not start proxy");
1623 barrier.wait();
1624
1625 let mut s1 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1626 let s3 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1627 let mut s2 = TcpStream::connect("127.0.0.1:1234").expect("could not connect");
1628
1629 s1.write(&b"hello "[..])
1630 .map_err(|e| {
1631 TEST_FINISHED.store(true, Ordering::Relaxed);
1632 e
1633 })
1634 .unwrap();
1635 println!("s1 sent");
1636
1637 s2.write(&b"pouet pouet"[..])
1638 .map_err(|e| {
1639 TEST_FINISHED.store(true, Ordering::Relaxed);
1640 e
1641 })
1642 .unwrap();
1643
1644 println!("s2 sent");
1645
1646 let mut res = [0; 128];
1647 s1.write(&b"coucou"[..])
1648 .map_err(|e| {
1649 TEST_FINISHED.store(true, Ordering::Relaxed);
1650 e
1651 })
1652 .unwrap();
1653
1654 s3.shutdown(Shutdown::Both).unwrap();
1655 let sz2 = s2
1656 .read(&mut res[..])
1657 .map_err(|e| {
1658 TEST_FINISHED.store(true, Ordering::Relaxed);
1659 e
1660 })
1661 .expect("could not read from socket");
1662 println!("s2 received {:?}", str::from_utf8(&res[..sz2]));
1663 assert_eq!(&res[..sz2], &b"pouet pouet"[..]);
1664
1665 let sz1 = s1
1666 .read(&mut res[..])
1667 .map_err(|e| {
1668 TEST_FINISHED.store(true, Ordering::Relaxed);
1669 e
1670 })
1671 .expect("could not read from socket");
1672 println!(
1673 "s1 received again({}): {:?}",
1674 sz1,
1675 str::from_utf8(&res[..sz1])
1676 );
1677 assert_eq!(&res[..sz1], &b"hello coucou"[..]);
1678 TEST_FINISHED.store(true, Ordering::Relaxed);
1679 }
1680
1681 fn start_server(barrier: Arc<Barrier>) {
1682 let listener = TcpListener::bind("127.0.0.1:5678").expect("could not bind");
1683 fn handle_client(stream: &mut TcpStream, id: u8) {
1684 let mut buf = [0; 128];
1685 let _response = b" END";
1686 while let Ok(sz) = stream.read(&mut buf[..]) {
1687 if sz > 0 {
1688 println!("ECHO[{}] got \"{:?}\"", id, str::from_utf8(&buf[..sz]));
1689 stream.write(&buf[..sz]).unwrap();
1690 }
1691 if TEST_FINISHED.load(Ordering::Relaxed) {
1692 println!("backend server stopping");
1693 break;
1694 }
1695 }
1696 }
1697
1698 let mut count = 0;
1699 thread::spawn(move || {
1700 barrier.wait();
1701 for conn in listener.incoming() {
1702 match conn {
1703 Ok(mut stream) => {
1704 thread::spawn(move || {
1705 println!("got a new client: {count}");
1706 handle_client(&mut stream, count)
1707 });
1708 }
1709 Err(e) => {
1710 println!("connection failed: {e:?}");
1711 }
1712 }
1713 count += 1;
1714 }
1715 });
1716 }
1717
1718 pub fn start_proxy() -> anyhow::Result<Channel<WorkerRequest, WorkerResponse>> {
1720 let config = ListenerBuilder::new_tcp(SocketAddress::new_v4(127, 0, 0, 1, 1234))
1721 .to_tcp(None)
1722 .expect("could not create listener config");
1723
1724 let (mut command, channel) =
1725 Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
1726 let _jg = thread::spawn(move || {
1727 setup_test_logger!();
1728 start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server");
1729 });
1730
1731 command.blocking().unwrap();
1732 {
1733 let front = RequestTcpFrontend {
1734 cluster_id: String::from("yolo"),
1735 address: SocketAddress::new_v4(127, 0, 0, 1, 1234),
1736 ..Default::default()
1737 };
1738 let backend = sozu_command_lib::response::Backend {
1739 cluster_id: String::from("yolo"),
1740 backend_id: String::from("yolo-0"),
1741 address: SocketAddress::new_v4(127, 0, 0, 1, 5678).into(),
1742 load_balancing_parameters: Some(LoadBalancingParams::default()),
1743 sticky_id: None,
1744 backup: None,
1745 };
1746
1747 command
1748 .write_message(&WorkerRequest {
1749 id: String::from("ID_YOLO1"),
1750 content: RequestType::AddTcpFrontend(front).into(),
1751 })
1752 .unwrap();
1753 command
1754 .write_message(&WorkerRequest {
1755 id: String::from("ID_YOLO2"),
1756 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1757 })
1758 .unwrap();
1759 }
1760 {
1761 let front = RequestTcpFrontend {
1762 cluster_id: String::from("yolo"),
1763 address: SocketAddress::new_v4(127, 0, 0, 1, 1235),
1764 ..Default::default()
1765 };
1766 let backend = sozu_command::response::Backend {
1767 cluster_id: String::from("yolo"),
1768 backend_id: String::from("yolo-0"),
1769 address: SocketAddress::new_v4(127, 0, 0, 1, 5678).into(),
1770 load_balancing_parameters: Some(LoadBalancingParams::default()),
1771 sticky_id: None,
1772 backup: None,
1773 };
1774 command
1775 .write_message(&WorkerRequest {
1776 id: String::from("ID_YOLO3"),
1777 content: RequestType::AddTcpFrontend(front).into(),
1778 })
1779 .unwrap();
1780 command
1781 .write_message(&WorkerRequest {
1782 id: String::from("ID_YOLO4"),
1783 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1784 })
1785 .unwrap();
1786 }
1787
1788 for _ in 0..4 {
1790 println!(
1791 "read_message: {:?}",
1792 command
1793 .read_message()
1794 .with_context(|| "could not read message")?
1795 );
1796 }
1797
1798 Ok(command)
1799 }
1800}