use std::{
cell::RefCell,
collections::{BTreeMap, HashMap, hash_map::Entry},
io::ErrorKind,
net::{Shutdown, SocketAddr},
os::unix::io::AsRawFd,
rc::Rc,
time::{Duration, Instant},
};
use mio::{
Interest, Registry, Token,
net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream},
unix::SourceFd,
};
use rusty_ulid::Ulid;
use sozu_command::{
ObjectKind,
config::MAX_LOOP_ITERATIONS,
logging::{EndpointRecord, LogContext, ansi_palette},
proto::command::request::RequestType,
};
use crate::metrics::names;
use crate::{
AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
backends::{Backend, BackendMap},
pool::{Checkout, Pool},
protocol::{
Pipe,
pipe::WebSocketContext,
proxy_protocol::{
expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
},
},
retry::RetryPolicy,
server::{CONN_RETRIES, ListenToken, SessionManager, push_event},
socket::{server_bind, stats::socket_rtt},
sozu_command::{
proto::command::{
Event, EventKind, ProxyProtocolConfig, RequestTcpFrontend, TcpListenerConfig,
UpdateTcpListenerConfig, WorkerRequest, WorkerResponse,
},
ready::Ready,
state::ClusterId,
},
timer::TimeoutContainer,
};
StateMachineBuilder! {
enum TcpStateMachine {
Pipe(Pipe<MioTcpStream, TcpListener>),
SendProxyProtocol(SendProxyProtocol<MioTcpStream>),
RelayProxyProtocol(RelayProxyProtocol<MioTcpStream>),
ExpectProxyProtocol(ExpectProxyProtocol<MioTcpStream>),
}
}
macro_rules! log_context {
($self:expr) => {{
let (open, reset, grey, gray, white) = ansi_palette();
format!(
"{gray}{ctx}{reset}\t{open}TCP{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset}, {gray}backend{reset}={white}{backend}{reset})\t >>>",
open = open,
reset = reset,
grey = grey,
gray = gray,
white = white,
ctx = $self.log_context(),
frontend = $self.frontend_token.0,
backend = $self
.backend_token
.map(|token| token.0.to_string())
.unwrap_or_else(|| "<none>".to_string()),
)
}};
}
macro_rules! log_module_context {
() => {{
let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
format!("{open}TCP{reset}\t >>>", open = open, reset = reset)
}};
}
pub struct TcpSession {
backend_buffer: Option<Checkout>,
backend_connected: BackendConnectionStatus,
backend_id: Option<String>,
backend_token: Option<Token>,
backend: Option<Rc<RefCell<Backend>>>,
cluster_id: Option<String>,
configured_backend_timeout: Duration,
connection_attempt: u8,
container_backend_timeout: TimeoutContainer,
container_frontend_timeout: TimeoutContainer,
frontend_address: Option<SocketAddr>,
frontend_buffer: Option<Checkout>,
frontend_token: Token,
has_been_closed: SessionIsToBeClosed,
last_event: Instant,
listener: Rc<RefCell<TcpListener>>,
metrics: SessionMetrics,
proxy: Rc<RefCell<TcpProxy>>,
request_id: Ulid,
state: TcpStateMachine,
cluster_ip_tracked: bool,
}
impl TcpSession {
#[allow(clippy::too_many_arguments)]
fn new(
backend_buffer: Checkout,
backend_id: Option<String>,
cluster_id: Option<String>,
configured_backend_timeout: Duration,
configured_connect_timeout: Duration,
configured_frontend_timeout: Duration,
frontend_buffer: Checkout,
frontend_token: Token,
listener: Rc<RefCell<TcpListener>>,
proxy_protocol: Option<ProxyProtocolConfig>,
proxy: Rc<RefCell<TcpProxy>>,
socket: MioTcpStream,
wait_time: Duration,
) -> TcpSession {
let frontend_address = socket.peer_addr().ok();
let mut frontend_buffer_session = None;
let mut backend_buffer_session = None;
let request_id = Ulid::generate();
let container_frontend_timeout =
TimeoutContainer::new(configured_frontend_timeout, frontend_token);
let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout);
let state = match proxy_protocol {
Some(ProxyProtocolConfig::RelayHeader) => {
backend_buffer_session = Some(backend_buffer);
gauge_add!(names::protocol::PROXY_RELAY, 1);
TcpStateMachine::RelayProxyProtocol(RelayProxyProtocol::new(
socket,
frontend_token,
request_id,
None,
frontend_buffer,
))
}
Some(ProxyProtocolConfig::ExpectHeader) => {
frontend_buffer_session = Some(frontend_buffer);
backend_buffer_session = Some(backend_buffer);
gauge_add!(names::protocol::PROXY_EXPECT, 1);
TcpStateMachine::ExpectProxyProtocol(ExpectProxyProtocol::new(
container_frontend_timeout.clone(),
socket,
frontend_token,
request_id,
))
}
Some(ProxyProtocolConfig::SendHeader) => {
frontend_buffer_session = Some(frontend_buffer);
backend_buffer_session = Some(backend_buffer);
gauge_add!(names::protocol::PROXY_SEND, 1);
TcpStateMachine::SendProxyProtocol(SendProxyProtocol::new(
socket,
frontend_token,
request_id,
None,
))
}
None => {
gauge_add!(names::protocol::TCP, 1);
let mut pipe = Pipe::new(
backend_buffer,
backend_id.clone(),
None,
None,
None,
None,
cluster_id.clone(),
frontend_buffer,
frontend_token,
socket,
listener.clone(),
Protocol::TCP,
request_id,
request_id,
frontend_address,
WebSocketContext::Tcp,
);
pipe.set_cluster_id(cluster_id.clone());
TcpStateMachine::Pipe(pipe)
}
};
let metrics = SessionMetrics::new(Some(wait_time));
TcpSession {
backend_buffer: backend_buffer_session,
backend_connected: BackendConnectionStatus::NotConnected,
backend_id,
backend_token: None,
backend: None,
cluster_id,
configured_backend_timeout,
connection_attempt: 0,
container_backend_timeout,
container_frontend_timeout,
frontend_address,
frontend_buffer: frontend_buffer_session,
frontend_token,
has_been_closed: false,
last_event: Instant::now(),
listener,
metrics,
proxy,
request_id,
state,
cluster_ip_tracked: false,
}
}
fn effective_session_address(&self) -> Option<SocketAddr> {
match &self.state {
TcpStateMachine::Pipe(pipe) => pipe.get_session_address(),
TcpStateMachine::ExpectProxyProtocol(epp) => {
epp.addresses.as_ref().and_then(|pa| pa.source())
}
TcpStateMachine::RelayProxyProtocol(rpp) => {
rpp.addresses.as_ref().and_then(|pa| pa.source())
}
TcpStateMachine::SendProxyProtocol(_) | TcpStateMachine::FailedUpgrade(_) => None,
}
.or(self.frontend_address)
}
fn log_request(&self) {
let listener = self.listener.borrow();
let context = self.log_context();
self.metrics.register_end_of_session(&context);
info_access!(
on_failure: { incr!(names::access_logs::UNSENT) },
message: None,
context,
session_address: self.frontend_address,
backend_address: None,
protocol: "TCP",
endpoint: EndpointRecord::Tcp,
tags: listener.get_tags(&listener.get_addr().to_string()),
client_rtt: socket_rtt(self.state.front_socket()),
server_rtt: None,
user_agent: None,
x_request_id: None,
tls_version: None,
tls_cipher: None,
tls_sni: None,
tls_alpn: None,
xff_chain: None,
service_time: self.metrics.service_time(),
response_time: self.metrics.backend_response_time(),
request_time: self.metrics.request_time(),
start_time_ns: self.metrics.start_wall_ns(),
bytes_in: self.metrics.bin,
bytes_out: self.metrics.bout,
otel: None,
);
}
fn front_hup(&mut self) -> SessionResult {
match &mut self.state {
TcpStateMachine::Pipe(pipe) => pipe.frontend_hup(&mut self.metrics),
_ => {
self.log_request();
SessionResult::Close
}
}
}
fn back_hup(&mut self) -> SessionResult {
match &mut self.state {
TcpStateMachine::Pipe(pipe) => pipe.backend_hup(&mut self.metrics),
_ => {
self.log_request();
SessionResult::Close
}
}
}
fn log_context(&self) -> LogContext<'_> {
LogContext {
session_id: self.request_id,
request_id: Some(self.request_id),
cluster_id: self.cluster_id.as_deref(),
backend_id: self.backend_id.as_deref(),
}
}
fn readable(&mut self) -> SessionResult {
if !self.container_frontend_timeout.reset() {
error!(
"{} Could not reset frontend timeout on readable",
log_context!(self)
);
}
if self.backend_connected == BackendConnectionStatus::Connected
&& !self.container_backend_timeout.reset()
{
error!(
"{} Could not reset backend timeout on readable",
log_context!(self)
);
}
match &mut self.state {
TcpStateMachine::Pipe(pipe) => pipe.readable(&mut self.metrics),
TcpStateMachine::RelayProxyProtocol(pp) => pp.readable(&mut self.metrics),
TcpStateMachine::ExpectProxyProtocol(pp) => pp.readable(&mut self.metrics),
TcpStateMachine::SendProxyProtocol(_) => SessionResult::Continue,
TcpStateMachine::FailedUpgrade(_) => unreachable!(),
}
}
fn writable(&mut self) -> SessionResult {
match &mut self.state {
TcpStateMachine::Pipe(pipe) => pipe.writable(&mut self.metrics),
_ => SessionResult::Continue,
}
}
fn back_readable(&mut self) -> SessionResult {
if !self.container_frontend_timeout.reset() {
error!(
"{} Could not reset frontend timeout on back_readable",
log_context!(self)
);
}
if !self.container_backend_timeout.reset() {
error!(
"{} Could not reset backend timeout on back_readable",
log_context!(self)
);
}
match &mut self.state {
TcpStateMachine::Pipe(pipe) => pipe.backend_readable(&mut self.metrics),
_ => SessionResult::Continue,
}
}
fn back_writable(&mut self) -> SessionResult {
match &mut self.state {
TcpStateMachine::Pipe(pipe) => pipe.backend_writable(&mut self.metrics),
TcpStateMachine::RelayProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
TcpStateMachine::SendProxyProtocol(pp) => pp.back_writable(&mut self.metrics),
TcpStateMachine::ExpectProxyProtocol(_) => SessionResult::Continue,
TcpStateMachine::FailedUpgrade(_) => {
unreachable!()
}
}
}
fn back_socket_mut(&mut self) -> Option<&mut MioTcpStream> {
match &mut self.state {
TcpStateMachine::Pipe(pipe) => pipe.back_socket_mut(),
TcpStateMachine::SendProxyProtocol(pp) => pp.back_socket_mut(),
TcpStateMachine::RelayProxyProtocol(pp) => pp.back_socket_mut(),
TcpStateMachine::ExpectProxyProtocol(_) => None,
TcpStateMachine::FailedUpgrade(_) => unreachable!(),
}
}
pub fn upgrade(&mut self) -> SessionIsToBeClosed {
let new_state = match self.state.take() {
TcpStateMachine::SendProxyProtocol(spp) => self.upgrade_send(spp),
TcpStateMachine::RelayProxyProtocol(rpp) => self.upgrade_relay(rpp),
TcpStateMachine::ExpectProxyProtocol(epp) => self.upgrade_expect(epp),
TcpStateMachine::Pipe(_) => None,
TcpStateMachine::FailedUpgrade(_) => todo!(),
};
match new_state {
Some(state) => {
self.state = state;
false
}
None => true,
}
}
fn upgrade_send(
&mut self,
send_proxy_protocol: SendProxyProtocol<MioTcpStream>,
) -> Option<TcpStateMachine> {
if self.backend_buffer.is_some() && self.frontend_buffer.is_some() {
let mut pipe = send_proxy_protocol.into_pipe(
self.frontend_buffer.take().unwrap(),
self.backend_buffer.take().unwrap(),
self.listener.clone(),
);
pipe.set_cluster_id(self.cluster_id.clone());
gauge_add!(names::protocol::PROXY_SEND, -1);
gauge_add!(names::protocol::TCP, 1);
return Some(TcpStateMachine::Pipe(pipe));
}
error!(
"{} Missing the frontend or backend buffer queue, we can't switch to a pipe",
log_context!(self)
);
None
}
fn upgrade_relay(&mut self, rpp: RelayProxyProtocol<MioTcpStream>) -> Option<TcpStateMachine> {
if self.backend_buffer.is_some() {
let mut pipe =
rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone());
pipe.set_cluster_id(self.cluster_id.clone());
gauge_add!(names::protocol::PROXY_RELAY, -1);
gauge_add!(names::protocol::TCP, 1);
return Some(TcpStateMachine::Pipe(pipe));
}
error!(
"{} Missing the backend buffer queue, we can't switch to a pipe",
log_context!(self)
);
None
}
fn upgrade_expect(
&mut self,
epp: ExpectProxyProtocol<MioTcpStream>,
) -> Option<TcpStateMachine> {
if self.frontend_buffer.is_some() && self.backend_buffer.is_some() {
let mut pipe = epp.into_pipe(
self.frontend_buffer.take().unwrap(),
self.backend_buffer.take().unwrap(),
None,
None,
self.listener.clone(),
);
pipe.set_cluster_id(self.cluster_id.clone());
gauge_add!(names::protocol::PROXY_EXPECT, -1);
gauge_add!(names::protocol::TCP, 1);
return Some(TcpStateMachine::Pipe(pipe));
}
error!(
"{} Missing the backend buffer queue, we can't switch to a pipe",
log_context!(self)
);
None
}
fn front_readiness(&mut self) -> &mut Readiness {
match &mut self.state {
TcpStateMachine::Pipe(pipe) => &mut pipe.frontend_readiness,
TcpStateMachine::SendProxyProtocol(pp) => &mut pp.frontend_readiness,
TcpStateMachine::RelayProxyProtocol(pp) => &mut pp.frontend_readiness,
TcpStateMachine::ExpectProxyProtocol(pp) => &mut pp.frontend_readiness,
TcpStateMachine::FailedUpgrade(_) => unreachable!(),
}
}
fn back_readiness(&mut self) -> Option<&mut Readiness> {
match &mut self.state {
TcpStateMachine::Pipe(pipe) => Some(&mut pipe.backend_readiness),
TcpStateMachine::SendProxyProtocol(pp) => Some(&mut pp.backend_readiness),
TcpStateMachine::RelayProxyProtocol(pp) => Some(&mut pp.backend_readiness),
TcpStateMachine::ExpectProxyProtocol(_) => None,
TcpStateMachine::FailedUpgrade(_) => unreachable!(),
}
}
fn set_back_socket(&mut self, socket: MioTcpStream) {
match &mut self.state {
TcpStateMachine::Pipe(pipe) => pipe.set_back_socket(socket),
TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_socket(socket),
TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_socket(socket),
TcpStateMachine::ExpectProxyProtocol(_) => {
error!(
"{} We should not set the back socket for the expect proxy protocol",
log_context!(self)
);
panic!(
"{} We should not set the back socket for the expect proxy protocol",
log_context!(self)
);
}
TcpStateMachine::FailedUpgrade(_) => unreachable!(),
}
}
fn set_back_token(&mut self, token: Token) {
self.backend_token = Some(token);
match &mut self.state {
TcpStateMachine::Pipe(pipe) => pipe.set_back_token(token),
TcpStateMachine::SendProxyProtocol(pp) => pp.set_back_token(token),
TcpStateMachine::RelayProxyProtocol(pp) => pp.set_back_token(token),
TcpStateMachine::ExpectProxyProtocol(_) => self.backend_token = Some(token),
TcpStateMachine::FailedUpgrade(_) => unreachable!(),
}
}
fn set_backend_id(&mut self, id: String) {
self.backend_id = Some(id.clone());
if let TcpStateMachine::Pipe(pipe) = &mut self.state {
pipe.set_backend_id(Some(id));
}
}
fn back_connected(&self) -> BackendConnectionStatus {
self.backend_connected
}
fn set_back_connected(&mut self, status: BackendConnectionStatus) {
let last = self.backend_connected;
self.backend_connected = status;
if status == BackendConnectionStatus::Connected {
gauge_add!(names::backend::CONNECTIONS, 1);
gauge_add!(
names::backend::CONNECTIONS_PER_BACKEND,
1,
self.cluster_id.as_deref(),
self.metrics.backend_id.as_deref()
);
self.container_backend_timeout
.set_duration(self.configured_backend_timeout);
self.container_frontend_timeout.reset();
if let TcpStateMachine::SendProxyProtocol(spp) = &mut self.state {
spp.set_back_connected(BackendConnectionStatus::Connected);
}
if let Some(backend) = self.backend.as_ref() {
let mut backend = backend.borrow_mut();
if backend.retry_policy.is_down() {
incr!(
"backend.up",
self.cluster_id.as_deref(),
self.metrics.backend_id.as_deref()
);
gauge!(
names::backend::AVAILABLE,
1,
self.cluster_id.as_deref(),
self.metrics.backend_id.as_deref()
);
info!(
"{} backend server {} at {} is up",
log_context!(self),
backend.backend_id,
backend.address
);
push_event(Event {
kind: EventKind::BackendUp as i32,
backend_id: Some(backend.backend_id.to_owned()),
address: Some(backend.address.into()),
cluster_id: None,
metric_detail: None,
});
}
if let BackendConnectionStatus::Connecting(start) = last {
backend.set_connection_time(Instant::now() - start);
}
backend.failures = 0;
backend.retry_policy.succeed();
}
}
}
fn remove_backend(&mut self) {
if let Some(backend) = self.backend.take() {
(*backend.borrow_mut()).dec_connections();
}
self.backend_token = None;
}
fn fail_backend_connection(&mut self) {
if let Some(backend) = self.backend.as_ref() {
let backend = &mut *backend.borrow_mut();
backend.failures += 1;
let already_unavailable = backend.retry_policy.is_down();
backend.retry_policy.fail();
incr!(
"backend.connections.error",
self.cluster_id.as_deref(),
self.metrics.backend_id.as_deref()
);
if !already_unavailable && backend.retry_policy.is_down() {
error!(
"{} backend server {} at {} is down",
log_context!(self),
backend.backend_id,
backend.address
);
incr!(
"backend.down",
self.cluster_id.as_deref(),
self.metrics.backend_id.as_deref()
);
gauge!(
names::backend::AVAILABLE,
0,
self.cluster_id.as_deref(),
self.metrics.backend_id.as_deref()
);
push_event(Event {
kind: EventKind::BackendDown as i32,
backend_id: Some(backend.backend_id.to_owned()),
address: Some(backend.address.into()),
cluster_id: None,
metric_detail: None,
});
}
}
}
pub fn test_back_socket(&mut self) -> SessionIsToBeClosed {
match self.back_socket_mut() {
Some(ref mut s) => {
let mut tmp = [0u8; 1];
let res = s.peek(&mut tmp[..]);
match res {
Ok(0) => false,
Ok(_) => true,
Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
}
}
None => false,
}
}
pub fn cancel_timeouts(&mut self) {
self.container_frontend_timeout.cancel();
self.container_backend_timeout.cancel();
}
fn ready_inner(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionResult {
let mut counter = 0;
let back_connected = self.back_connected();
if back_connected.is_connecting() {
if self.back_readiness().unwrap().event.is_hup() && !self.test_back_socket() {
debug!(
"{} error connecting to backend, trying again",
log_context!(self)
);
self.connection_attempt += 1;
self.fail_backend_connection();
self.close_backend();
let connection_result = self.connect_to_backend(session.clone());
if let Err(err) = &connection_result {
match err {
BackendConnectionError::MaxConnectionRetries(_) => trace!(
"{} Error connecting to backend: {}",
log_context!(self),
err
),
_ => warn!(
"{} Error connecting to backend: {}",
log_context!(self),
err
),
}
}
if let Some(state_result) = handle_connection_result(connection_result) {
return state_result;
}
} else if self.back_readiness().unwrap().event != Ready::EMPTY {
self.connection_attempt = 0;
self.set_back_connected(BackendConnectionStatus::Connected);
}
} else if back_connected == BackendConnectionStatus::NotConnected {
let connection_result = self.connect_to_backend(session.clone());
if let Err(err) = &connection_result {
match err {
BackendConnectionError::MaxConnectionRetries(_) => trace!(
"{} Error connecting to backend: {}",
log_context!(self),
err
),
_ => warn!(
"{} Error connecting to backend: {}",
log_context!(self),
err
),
}
}
if let Some(state_result) = handle_connection_result(connection_result) {
return state_result;
}
}
if self.front_readiness().event.is_hup() {
let session_result = self.front_hup();
if session_result == SessionResult::Continue {
self.front_readiness().event.remove(Ready::HUP);
}
return session_result;
}
while counter < MAX_LOOP_ITERATIONS {
let front_interest = self.front_readiness().interest & self.front_readiness().event;
let back_interest = self
.back_readiness()
.map(|r| r.interest & r.event)
.unwrap_or(Ready::EMPTY);
trace!(
"{} Frontend interest({:?}) and backend interest({:?})",
log_context!(self),
front_interest,
back_interest
);
if front_interest == Ready::EMPTY && back_interest == Ready::EMPTY {
break;
}
if self
.back_readiness()
.map(|r| r.event.is_hup())
.unwrap_or(false)
&& self.front_readiness().interest.is_writable()
&& !self.front_readiness().event.is_writable()
{
break;
}
if front_interest.is_readable() {
let session_result = self.readable();
if session_result != SessionResult::Continue {
return session_result;
}
}
if back_interest.is_writable() {
let session_result = self.back_writable();
if session_result != SessionResult::Continue {
return session_result;
}
}
if back_interest.is_readable() {
let session_result = self.back_readable();
if session_result != SessionResult::Continue {
return session_result;
}
}
if front_interest.is_writable() {
let session_result = self.writable();
if session_result != SessionResult::Continue {
return session_result;
}
}
if back_interest.is_hup() {
let session_result = self.back_hup();
if session_result != SessionResult::Continue {
return session_result;
}
}
if front_interest.is_error() {
error!(
"{} Frontend socket error, disconnecting",
log_context!(self)
);
self.front_readiness().interest = Ready::EMPTY;
if let Some(r) = self.back_readiness() {
r.interest = Ready::EMPTY;
}
return SessionResult::Close;
}
if back_interest.is_error() && self.back_hup() == SessionResult::Close {
self.front_readiness().interest = Ready::EMPTY;
if let Some(r) = self.back_readiness() {
r.interest = Ready::EMPTY;
}
error!("{} backend socket error, disconnecting", log_context!(self));
return SessionResult::Close;
}
counter += 1;
}
if counter >= MAX_LOOP_ITERATIONS {
error!(
"{} Handling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
log_context!(self),
MAX_LOOP_ITERATIONS
);
incr!(names::tcp::INFINITE_LOOP_ERROR);
let front_interest = self.front_readiness().interest & self.front_readiness().event;
let back_interest = self
.back_readiness()
.map(|r| r.interest & r.event)
.unwrap_or(Ready::EMPTY);
let back = self.back_readiness().cloned();
error!(
"{} readiness: front {:?} / back {:?} | front: {:?} | back: {:?} ",
log_context!(self),
self.front_readiness(),
back,
front_interest,
back_interest
);
self.print_session();
return SessionResult::Close;
}
SessionResult::Continue
}
fn close_backend(&mut self) {
if let (Some(token), Some(fd)) = (
self.backend_token,
self.back_socket_mut().map(|s| s.as_raw_fd()),
) {
let proxy = self.proxy.borrow();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!(
"{} Error deregistering socket({:?}): {:?}",
log_context!(self),
fd,
e
);
}
proxy.sessions.borrow_mut().slab.try_remove(token.0);
}
self.remove_backend();
let back_connected = self.back_connected();
if back_connected != BackendConnectionStatus::NotConnected {
if let Some(r) = self.back_readiness() {
r.event = Ready::EMPTY;
}
let log_context = log_context!(self);
if let Some(sock) = self.back_socket_mut() {
if let Err(e) = sock.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!(
"{} Error closing back socket({:?}): {:?}",
log_context, sock, e
);
}
}
}
}
if back_connected == BackendConnectionStatus::Connected {
gauge_add!(names::backend::CONNECTIONS, -1);
gauge_add!(
names::backend::CONNECTIONS_PER_BACKEND,
-1,
self.cluster_id.as_deref(),
self.metrics.backend_id.as_deref()
);
}
self.set_back_connected(BackendConnectionStatus::NotConnected);
}
fn connect_to_backend(
&mut self,
session_rc: Rc<RefCell<dyn ProxySession>>,
) -> Result<BackendConnectAction, BackendConnectionError> {
let cluster_id = self
.listener
.borrow()
.cluster_id
.clone()
.ok_or(BackendConnectionError::NotFound(ObjectKind::TcpCluster))?;
self.cluster_id = Some(cluster_id.clone());
if self.connection_attempt >= CONN_RETRIES {
incr!(
"backend.connect.retries_exhausted",
self.cluster_id.as_deref(),
self.metrics.backend_id.as_deref()
);
warn!(
"{} Max connection attempt reached ({})",
log_context!(self),
self.connection_attempt
);
return Err(BackendConnectionError::MaxConnectionRetries(Some(
cluster_id,
)));
}
if self.proxy.borrow().sessions.borrow().at_capacity() {
return Err(BackendConnectionError::MaxSessionsMemory);
}
let cluster_max_connections_per_ip = self
.proxy
.borrow()
.configs
.get(&cluster_id)
.and_then(|c| c.max_connections_per_ip);
if let Some(ip) = self.effective_session_address().map(|sa| sa.ip()) {
let sessions_rc = self.proxy.borrow().sessions.clone();
let at_limit = sessions_rc.borrow().cluster_ip_at_limit(
self.frontend_token,
&cluster_id,
&ip,
cluster_max_connections_per_ip,
);
if at_limit {
debug!(
"{} per-(cluster, source-IP) limit hit for cluster {} from {}",
log_context!(self),
cluster_id,
ip
);
return Err(BackendConnectionError::TooManyConnectionsPerIp { cluster_id });
}
sessions_rc
.borrow_mut()
.track_cluster_ip(self.frontend_token, cluster_id.clone(), ip);
self.cluster_ip_tracked = true;
}
let (backend, mut stream) = self
.proxy
.borrow()
.backends
.borrow_mut()
.backend_from_cluster_id(&cluster_id)
.map_err(BackendConnectionError::Backend)?;
if let Err(e) = stream.set_nodelay(true) {
error!(
"{} Error setting nodelay on back socket({:?}): {:?}",
log_context!(self),
stream,
e
);
}
self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());
let back_token = {
let proxy = self.proxy.borrow();
let mut s = proxy.sessions.borrow_mut();
let entry = s.slab.vacant_entry();
let back_token = Token(entry.key());
let _entry = entry.insert(session_rc.clone());
back_token
};
if let Err(e) = self.proxy.borrow().registry.register(
&mut stream,
back_token,
Interest::READABLE | Interest::WRITABLE,
) {
error!(
"{} Error registering back socket({:?}): {:?}",
log_context!(self),
stream,
e
);
}
self.container_backend_timeout.set(back_token);
self.set_back_token(back_token);
self.set_back_socket(stream);
self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
self.metrics.backend_start();
self.set_backend_id(backend.borrow().backend_id.clone());
Ok(BackendConnectAction::New)
}
}
impl ProxySession for TcpSession {
fn close(&mut self) {
if self.has_been_closed {
return;
}
trace!("{} Closing TCP session", log_context!(self));
self.metrics.service_stop();
if self.cluster_ip_tracked {
self.proxy
.borrow()
.sessions
.borrow_mut()
.untrack_all_cluster_ip(self.frontend_token);
self.cluster_ip_tracked = false;
}
match self.state.marker() {
StateMarker::Pipe => gauge_add!(names::protocol::TCP, -1),
StateMarker::SendProxyProtocol => gauge_add!(names::protocol::PROXY_SEND, -1),
StateMarker::RelayProxyProtocol => gauge_add!(names::protocol::PROXY_RELAY, -1),
StateMarker::ExpectProxyProtocol => gauge_add!(names::protocol::PROXY_EXPECT, -1),
}
if self.state.failed() {
match self.state.marker() {
StateMarker::Pipe => incr!(names::tcp::UPGRADE_PIPE_FAILED),
StateMarker::SendProxyProtocol => incr!(names::tcp::UPGRADE_SEND_FAILED),
StateMarker::RelayProxyProtocol => incr!(names::tcp::UPGRADE_RELAY_FAILED),
StateMarker::ExpectProxyProtocol => incr!(names::tcp::UPGRADE_EXPECT_FAILED),
}
return;
}
self.cancel_timeouts();
let front_socket = self.state.front_socket();
if let Err(e) = front_socket.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!(
"{} Error shutting down front socket({:?}): {:?}",
log_context!(self),
front_socket,
e
);
}
}
{
let proxy = self.proxy.borrow();
let fd = front_socket.as_raw_fd();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!(
"{} Error deregistering front socket({:?}) while closing TCP session: {:?}",
log_context!(self),
fd,
e
);
}
proxy
.sessions
.borrow_mut()
.slab
.try_remove(self.frontend_token.0);
}
self.close_backend();
self.has_been_closed = true;
}
fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
if self.frontend_token == token {
self.container_frontend_timeout.triggered();
return true;
}
if self.backend_token == Some(token) {
self.container_backend_timeout.triggered();
return true;
}
false
}
fn protocol(&self) -> Protocol {
Protocol::TCP
}
fn update_readiness(&mut self, token: Token, events: Ready) {
trace!(
"{} token {:?} got event {}",
log_context!(self),
token,
super::ready_to_string(events)
);
self.last_event = Instant::now();
self.metrics.wait_start();
if self.frontend_token == token {
self.front_readiness().event = self.front_readiness().event | events;
} else if self.backend_token == Some(token) {
if let Some(r) = self.back_readiness() {
r.event |= events;
}
}
}
fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
self.metrics.service_start();
let session_result = self.ready_inner(session.clone());
let to_bo_closed = match session_result {
SessionResult::Close => true,
SessionResult::Continue => false,
SessionResult::Upgrade => match self.upgrade() {
false => self.ready(session),
true => true,
},
};
self.metrics.service_stop();
to_bo_closed
}
fn shutting_down(&mut self) -> SessionIsToBeClosed {
true
}
fn last_event(&self) -> Instant {
self.last_event
}
fn print_session(&self) {
let state: String = match &self.state {
TcpStateMachine::ExpectProxyProtocol(_) => String::from("Expect"),
TcpStateMachine::SendProxyProtocol(_) => String::from("Send"),
TcpStateMachine::RelayProxyProtocol(_) => String::from("Relay"),
TcpStateMachine::Pipe(_) => String::from("TCP"),
TcpStateMachine::FailedUpgrade(marker) => format!("FailedUpgrade({marker:?})"),
};
let front_readiness = match &self.state {
TcpStateMachine::ExpectProxyProtocol(expect) => Some(&expect.frontend_readiness),
TcpStateMachine::SendProxyProtocol(send) => Some(&send.frontend_readiness),
TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.frontend_readiness),
TcpStateMachine::Pipe(pipe) => Some(&pipe.frontend_readiness),
TcpStateMachine::FailedUpgrade(_) => None,
};
let back_readiness = match &self.state {
TcpStateMachine::SendProxyProtocol(send) => Some(&send.backend_readiness),
TcpStateMachine::RelayProxyProtocol(relay) => Some(&relay.backend_readiness),
TcpStateMachine::Pipe(pipe) => Some(&pipe.backend_readiness),
TcpStateMachine::ExpectProxyProtocol(_) => None,
TcpStateMachine::FailedUpgrade(_) => None,
};
error!(
"\
{} Session ({:?})
\tFrontend:
\t\ttoken: {:?}\treadiness: {:?}
\tBackend:
\t\ttoken: {:?}\treadiness: {:?}\tstatus: {:?}\tcluster id: {:?}",
log_context!(self),
state,
self.frontend_token,
front_readiness,
self.backend_token,
back_readiness,
self.backend_connected,
self.cluster_id
);
error!("Metrics: {:?}", self.metrics);
}
fn frontend_token(&self) -> Token {
self.frontend_token
}
}
pub struct TcpListener {
active: SessionIsToBeClosed,
address: SocketAddr,
cluster_id: Option<String>,
config: TcpListenerConfig,
listener: Option<MioTcpListener>,
tags: BTreeMap<String, CachedTags>,
token: Token,
}
impl ListenerHandler for TcpListener {
fn get_addr(&self) -> &SocketAddr {
&self.address
}
fn get_tags(&self, key: &str) -> Option<&CachedTags> {
self.tags.get(key)
}
fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
match tags {
Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
None => self.tags.remove(&key),
};
}
fn protocol(&self) -> Protocol {
Protocol::TCP
}
fn public_address(&self) -> SocketAddr {
self.config
.public_address
.map(|addr| addr.into())
.unwrap_or(self.address)
}
}
impl TcpListener {
fn new(config: TcpListenerConfig, token: Token) -> Result<TcpListener, ListenerError> {
Ok(TcpListener {
cluster_id: None,
listener: None,
token,
address: config.address.into(),
config,
active: false,
tags: BTreeMap::new(),
})
}
pub fn activate(
&mut self,
registry: &Registry,
tcp_listener: Option<MioTcpListener>,
) -> Result<Token, ProxyError> {
if self.active {
return Ok(self.token);
}
let mut listener = match tcp_listener {
Some(listener) => listener,
None => {
let address = self.config.address.into();
server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
}
};
registry
.register(&mut listener, self.token, Interest::READABLE)
.map_err(ProxyError::RegisterListener)?;
self.listener = Some(listener);
self.active = true;
Ok(self.token)
}
pub fn update_config(&mut self, patch: &UpdateTcpListenerConfig) -> Result<(), ListenerError> {
if let Some(v) = patch.public_address {
self.config.public_address = Some(v);
}
if let Some(v) = patch.expect_proxy {
self.config.expect_proxy = v;
}
if let Some(v) = patch.front_timeout {
self.config.front_timeout = v;
}
if let Some(v) = patch.back_timeout {
self.config.back_timeout = v;
}
if let Some(v) = patch.connect_timeout {
self.config.connect_timeout = v;
}
Ok(())
}
}
fn handle_connection_result(
connection_result: Result<BackendConnectAction, BackendConnectionError>,
) -> Option<SessionResult> {
match connection_result {
Ok(BackendConnectAction::Reuse) => None,
Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
Some(SessionResult::Continue)
}
Err(_) => {
Some(SessionResult::Close)
}
}
}
#[derive(Debug)]
pub struct ClusterConfiguration {
proxy_protocol: Option<ProxyProtocolConfig>,
pub max_connections_per_ip: Option<u64>,
}
pub struct TcpProxy {
fronts: HashMap<String, Token>,
backends: Rc<RefCell<BackendMap>>,
listeners: HashMap<Token, Rc<RefCell<TcpListener>>>,
configs: HashMap<ClusterId, ClusterConfiguration>,
registry: Registry,
sessions: Rc<RefCell<SessionManager>>,
pool: Rc<RefCell<Pool>>,
}
impl TcpProxy {
pub fn new(
registry: Registry,
sessions: Rc<RefCell<SessionManager>>,
pool: Rc<RefCell<Pool>>,
backends: Rc<RefCell<BackendMap>>,
) -> TcpProxy {
TcpProxy {
backends,
listeners: HashMap::new(),
configs: HashMap::new(),
fronts: HashMap::new(),
registry,
sessions,
pool,
}
}
pub fn add_listener(
&mut self,
config: TcpListenerConfig,
token: Token,
) -> Result<Token, ProxyError> {
match self.listeners.entry(token) {
Entry::Vacant(entry) => {
let tcp_listener =
TcpListener::new(config, token).map_err(ProxyError::AddListener)?;
entry.insert(Rc::new(RefCell::new(tcp_listener)));
Ok(token)
}
_ => Err(ProxyError::ListenerAlreadyPresent),
}
}
pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
let len = self.listeners.len();
self.listeners.retain(|_, l| l.borrow().address != address);
self.listeners.len() < len
}
pub fn activate_listener(
&self,
addr: &SocketAddr,
tcp_listener: Option<MioTcpListener>,
) -> Result<Token, ProxyError> {
let listener = self
.listeners
.values()
.find(|listener| listener.borrow().address == *addr)
.ok_or(ProxyError::NoListenerFound(*addr))?;
listener.borrow_mut().activate(&self.registry, tcp_listener)
}
pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
self.listeners
.values()
.filter_map(|listener| {
let mut owned = listener.borrow_mut();
if let Some(listener) = owned.listener.take() {
owned.active = false;
return Some((owned.address, listener));
}
None
})
.collect()
}
pub fn give_back_listener(
&mut self,
address: SocketAddr,
) -> Result<(Token, MioTcpListener), ProxyError> {
let listener = self
.listeners
.values()
.find(|listener| listener.borrow().address == address)
.ok_or(ProxyError::NoListenerFound(address))?;
let mut owned = listener.borrow_mut();
let taken_listener = owned
.listener
.take()
.ok_or(ProxyError::UnactivatedListener)?;
owned.active = false;
Ok((owned.token, taken_listener))
}
pub fn update_listener(&mut self, patch: UpdateTcpListenerConfig) -> Result<(), ProxyError> {
let address: SocketAddr = patch.address.into();
let listener = self
.listeners
.values()
.find(|l| l.borrow().address == address)
.ok_or(ProxyError::NoListenerFound(address))?;
listener
.borrow_mut()
.update_config(&patch)
.map_err(|listener_error| ProxyError::ListenerActivation {
address,
listener_error,
})
}
pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
let address = front.address.into();
let mut listener = self
.listeners
.values()
.find(|l| l.borrow().address == address)
.ok_or(ProxyError::NoListenerFound(address))?
.borrow_mut();
self.fronts
.insert(front.cluster_id.to_string(), listener.token);
listener.set_tags(address.to_string(), Some(front.tags));
listener.cluster_id = Some(front.cluster_id);
Ok(())
}
pub fn remove_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
let address = front.address.into();
let mut listener = match self
.listeners
.values()
.find(|l| l.borrow().address == address)
{
Some(l) => l.borrow_mut(),
None => return Err(ProxyError::NoListenerFound(address)),
};
listener.set_tags(address.to_string(), None);
if let Some(cluster_id) = listener.cluster_id.take() {
self.fronts.remove(&cluster_id);
}
Ok(())
}
}
impl ProxyConfiguration for TcpProxy {
fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
let request_type = match message.content.request_type {
Some(t) => t,
None => return WorkerResponse::error(message.id, "Empty request"),
};
match request_type {
RequestType::AddTcpFrontend(front) => {
if let Err(err) = self.add_tcp_front(front) {
return WorkerResponse::error(message.id, err);
}
WorkerResponse::ok(message.id)
}
RequestType::RemoveTcpFrontend(front) => {
if let Err(err) = self.remove_tcp_front(front) {
return WorkerResponse::error(message.id, err);
}
WorkerResponse::ok(message.id)
}
RequestType::SoftStop(_) => {
info!(
"{} {} processing soft shutdown",
log_module_context!(),
message.id
);
let listeners: HashMap<_, _> = self.listeners.drain().collect();
for (_, l) in listeners.iter() {
l.borrow_mut()
.listener
.take()
.map(|mut sock| self.registry.deregister(&mut sock));
}
WorkerResponse::processing(message.id)
}
RequestType::HardStop(_) => {
info!("{} {} hard shutdown", log_module_context!(), message.id);
let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
for (_, l) in listeners.drain() {
l.borrow_mut()
.listener
.take()
.map(|mut sock| self.registry.deregister(&mut sock));
}
WorkerResponse::ok(message.id)
}
RequestType::Status(_) => {
info!("{} {} status", log_module_context!(), message.id);
WorkerResponse::ok(message.id)
}
RequestType::AddCluster(cluster) => {
let config = ClusterConfiguration {
proxy_protocol: cluster
.proxy_protocol
.and_then(|n| ProxyProtocolConfig::try_from(n).ok()),
max_connections_per_ip: cluster.max_connections_per_ip,
};
self.configs.insert(cluster.cluster_id, config);
WorkerResponse::ok(message.id)
}
RequestType::RemoveCluster(cluster_id) => {
self.configs.remove(&cluster_id);
WorkerResponse::ok(message.id)
}
RequestType::RemoveListener(remove) => {
if !self.remove_listener(remove.address.into()) {
WorkerResponse::error(
message.id,
format!("no TCP listener to remove at address {:?}", remove.address),
)
} else {
WorkerResponse::ok(message.id)
}
}
command => {
debug!(
"{} {} unsupported message for TCP proxy, ignoring {:?}",
log_module_context!(),
message.id,
command
);
WorkerResponse::error(message.id, "unsupported message")
}
}
}
fn accept(&mut self, token: ListenToken) -> Result<MioTcpStream, AcceptError> {
let internal_token = Token(token.0);
if let Some(listener) = self.listeners.get(&internal_token) {
if let Some(tcp_listener) = &listener.borrow().listener {
tcp_listener
.accept()
.map(|(frontend_sock, _)| frontend_sock)
.map_err(|e| match e.kind() {
ErrorKind::WouldBlock => AcceptError::WouldBlock,
_ => {
error!("{} accept() IO error: {:?}", log_module_context!(), e);
AcceptError::IoError
}
})
} else {
Err(AcceptError::IoError)
}
} else {
Err(AcceptError::IoError)
}
}
fn create_session(
&mut self,
mut frontend_sock: MioTcpStream,
token: ListenToken,
wait_time: Duration,
proxy: Rc<RefCell<Self>>,
) -> Result<(), AcceptError> {
let listener_token = Token(token.0);
let listener = self
.listeners
.get(&listener_token)
.ok_or(AcceptError::IoError)?;
let owned = listener.borrow();
let mut pool = self.pool.borrow_mut();
let (front_buffer, back_buffer) = match (pool.checkout(), pool.checkout()) {
(Some(fb), Some(bb)) => (fb, bb),
_ => {
error!("{} could not get buffers from pool", log_module_context!());
error!(
"{} Buffer capacity has been reached, stopping to accept new connections for now",
log_module_context!()
);
gauge!(names::accept_queue::BACKPRESSURE, 1);
self.sessions.borrow_mut().can_accept = false;
return Err(AcceptError::BufferCapacityReached);
}
};
if owned.cluster_id.is_none() {
error!(
"{} listener at address {:?} has no linked cluster",
log_module_context!(),
owned.address
);
return Err(AcceptError::IoError);
}
let proxy_protocol = self
.configs
.get(owned.cluster_id.as_ref().unwrap())
.and_then(|c| c.proxy_protocol);
if let Err(e) = frontend_sock.set_nodelay(true) {
error!(
"{} error setting nodelay on front socket({:?}): {:?}",
log_module_context!(),
frontend_sock,
e
);
}
let mut session_manager = self.sessions.borrow_mut();
let entry = session_manager.slab.vacant_entry();
let frontend_token = Token(entry.key());
if let Err(register_error) = self.registry.register(
&mut frontend_sock,
frontend_token,
Interest::READABLE | Interest::WRITABLE,
) {
error!(
"{} error registering front socket({:?}): {:?}",
log_module_context!(),
frontend_sock,
register_error
);
return Err(AcceptError::RegisterError);
}
let session = TcpSession::new(
back_buffer,
None,
owned.cluster_id.clone(),
Duration::from_secs(owned.config.back_timeout as u64),
Duration::from_secs(owned.config.connect_timeout as u64),
Duration::from_secs(owned.config.front_timeout as u64),
front_buffer,
frontend_token,
listener.clone(),
proxy_protocol,
proxy,
frontend_sock,
wait_time,
);
incr!(names::tcp::REQUESTS);
let session = Rc::new(RefCell::new(session));
entry.insert(session);
Ok(())
}
}
pub mod testing {
use crate::testing::*;
pub fn start_tcp_worker(
config: TcpListenerConfig,
max_buffers: usize,
buffer_size: usize,
channel: ProxyChannel,
) -> anyhow::Result<()> {
let address = config.address.into();
let ServerParts {
event_loop,
registry,
sessions,
pool,
backends,
client_scm_socket: _,
server_scm_socket,
server_config,
} = prebuild_server(max_buffers, buffer_size, true)?;
let token = {
let mut sessions = sessions.borrow_mut();
let entry = sessions.slab.vacant_entry();
let key = entry.key();
let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::TCPListen,
})));
Token(key)
};
let mut proxy = TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
proxy
.add_listener(config, token)
.with_context(|| "Failed at creating adding the listener")?;
proxy
.activate_listener(&address, None)
.with_context(|| "Failed at creating activating the listener")?;
let mut server = Server::new(
event_loop,
channel,
server_scm_socket,
sessions,
pool,
backends,
None,
None,
Some(proxy),
server_config,
None,
false,
)
.with_context(|| "Failed at creating server")?;
debug!("{} starting event loop", log_module_context!());
server.run();
debug!("{} ending event loop", log_module_context!());
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::{
io::{Read, Write},
net::{Shutdown, TcpListener, TcpStream},
str,
sync::{
Arc, Barrier,
atomic::{AtomicBool, Ordering},
},
thread,
time::Duration,
};
use sozu_command::{
channel::Channel,
config::ListenerBuilder,
proto::command::{
LoadBalancingParams, RequestTcpFrontend, SocketAddress, SoftStop, WorkerRequest,
WorkerResponse, request::RequestType,
},
};
use super::testing::start_tcp_worker;
use crate::testing::*;
#[test]
fn round_trip() {
setup_test_logger!();
let barrier = Arc::new(Barrier::new(2));
let test_finished = Arc::new(AtomicBool::new(false));
let front_port1 = provide_port();
let front_port2 = provide_port();
let backend_port = start_server(barrier.clone(), test_finished.clone());
let mut command =
start_proxy(backend_port, front_port1, front_port2).expect("Could not start proxy");
barrier.wait();
thread::scope(|_s| {
let front_addr = format!("127.0.0.1:{front_port1}");
let mut s1 = TcpStream::connect(&front_addr).expect("could not connect");
s1.set_read_timeout(Some(Duration::from_secs(5)))
.expect("could not set read timeout on s1");
let s3 = TcpStream::connect(&front_addr).expect("could not connect");
let mut s2 = TcpStream::connect(&front_addr).expect("could not connect");
s2.set_read_timeout(Some(Duration::from_secs(5)))
.expect("could not set read timeout on s2");
s1.write_all(b"hello ").expect("could not write to s1");
println!("s1 sent");
s2.write_all(b"pouet pouet").expect("could not write to s2");
println!("s2 sent");
let mut res = [0; 128];
s1.write_all(b"coucou").expect("could not write to s1");
s3.shutdown(Shutdown::Both).expect("could not shutdown s3");
let sz2 = s2
.read(&mut res[..])
.expect("could not read from socket s2");
println!("s2 received {:?}", str::from_utf8(&res[..sz2]));
assert_eq!(&res[..sz2], &b"pouet pouet"[..]);
let expected = b"hello coucou";
let mut total = 0;
while total < expected.len() {
let sz = s1
.read(&mut res[total..])
.expect("could not read from socket s1");
assert!(sz > 0, "connection closed before receiving all data");
total += sz;
}
println!(
"s1 received again({}): {:?}",
total,
str::from_utf8(&res[..total])
);
assert_eq!(&res[..total], &expected[..]);
test_finished.store(true, Ordering::Relaxed);
command
.write_message(&WorkerRequest {
id: "ID_SOFTSTOP".to_owned(),
content: RequestType::SoftStop(SoftStop {}).into(),
})
.expect("could not send SoftStop to sozu worker");
});
}
fn start_server(barrier: Arc<Barrier>, test_finished: Arc<AtomicBool>) -> u16 {
let listener =
TcpListener::bind("127.0.0.1:0").expect("could not bind echo server listener");
let port = listener
.local_addr()
.expect("could not get echo server local address")
.port();
listener
.set_nonblocking(true)
.expect("could not set echo server listener to non-blocking");
thread::spawn(move || {
barrier.wait();
let mut count: u8 = 0;
loop {
match listener.accept() {
Ok((mut stream, _)) => {
let finished = test_finished.clone();
thread::spawn(move || {
println!("got a new client: {count}");
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.expect("could not set read timeout on echo client");
let mut buf = [0; 128];
loop {
match stream.read(&mut buf[..]) {
Ok(0) => break,
Ok(sz) => {
println!(
"ECHO[{count}] got \"{:?}\"",
str::from_utf8(&buf[..sz])
);
stream
.write_all(&buf[..sz])
.expect("could not echo data back");
}
Err(ref e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut =>
{
if finished.load(Ordering::Relaxed) {
println!("backend server stopping (client handler)");
break;
}
}
Err(_) => break,
}
}
});
count = count.wrapping_add(1);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
if test_finished.load(Ordering::Relaxed) {
println!("backend server stopping (accept loop)");
break;
}
thread::sleep(Duration::from_millis(50));
}
Err(e) => {
println!("connection failed: {e:?}");
}
}
}
});
port
}
fn start_proxy(
backend_port: u16,
front_port1: u16,
front_port2: u16,
) -> anyhow::Result<Channel<WorkerRequest, WorkerResponse>> {
let config = ListenerBuilder::new_tcp(SocketAddress::new_v4(127, 0, 0, 1, front_port1))
.to_tcp(None)
.expect("could not create listener config");
let (mut command, channel) =
Channel::generate(1000, 10000).with_context(|| "should create a channel")?;
let _jg = thread::spawn(move || {
setup_test_logger!();
start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server");
});
command
.blocking()
.expect("could not set command channel to blocking");
{
let front = RequestTcpFrontend {
cluster_id: "yolo".to_owned(),
address: SocketAddress::new_v4(127, 0, 0, 1, front_port1),
..Default::default()
};
let backend = sozu_command_lib::response::Backend {
cluster_id: "yolo".to_owned(),
backend_id: "yolo-0".to_owned(),
address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
load_balancing_parameters: Some(LoadBalancingParams::default()),
sticky_id: None,
backup: None,
};
command
.write_message(&WorkerRequest {
id: "ID_YOLO1".to_owned(),
content: RequestType::AddTcpFrontend(front).into(),
})
.expect("could not send AddTcpFrontend for front1");
command
.write_message(&WorkerRequest {
id: "ID_YOLO2".to_owned(),
content: RequestType::AddBackend(backend.to_add_backend()).into(),
})
.expect("could not send AddBackend for front1");
}
{
let front = RequestTcpFrontend {
cluster_id: "yolo".to_owned(),
address: SocketAddress::new_v4(127, 0, 0, 1, front_port2),
..Default::default()
};
let backend = sozu_command::response::Backend {
cluster_id: "yolo".to_owned(),
backend_id: "yolo-0".to_owned(),
address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
load_balancing_parameters: Some(LoadBalancingParams::default()),
sticky_id: None,
backup: None,
};
command
.write_message(&WorkerRequest {
id: "ID_YOLO3".to_owned(),
content: RequestType::AddTcpFrontend(front).into(),
})
.expect("could not send AddTcpFrontend for front2");
command
.write_message(&WorkerRequest {
id: "ID_YOLO4".to_owned(),
content: RequestType::AddBackend(backend.to_add_backend()).into(),
})
.expect("could not send AddBackend for front2");
}
for _ in 0..4 {
println!(
"read_message: {:?}",
command
.read_message()
.with_context(|| "could not read message")?
);
}
Ok(command)
}
}