use super::state::{ListenerState, Protocol, TASK_REGISTRY};
use crate::layers::l4::dispatcher;
use crate::resources::kv;
use fancy_log::{LogLevel, log};
use sigterm::{Shutdown, ShutdownHandle};
use tokio::net::TcpListener;
pub fn spawn_tcp_listener_task(port: u16, listener: TcpListener) -> ShutdownHandle {
let (shutdown, handle) = Shutdown::new();
let key = (port, Protocol::Tcp);
tokio::spawn(async move {
let mut shutdown_fut = std::pin::pin!(shutdown.recv());
loop {
tokio::select! {
Ok((socket, addr)) = listener.accept() => {
let client_ip: std::net::IpAddr = addr.ip();
let Some(_guard) = super::tasks::GLOBAL_TRACKER.acquire(client_ip) else {
log(LogLevel::Debug, &format!("⚙ Rate limited TCP connection from {addr} on port {port}"));
continue;
};
if let Some(task) = TASK_REGISTRY.get(&key) {
let mut state = task.state.lock().await;
if let ListenerState::Draining {..} = *state {
log(LogLevel::Debug, &format!("⚙ Rejecting new connection from {addr} on draining port {port}"));
continue;
}
*state = ListenerState::Active;
}
let server_addr = socket.local_addr().unwrap_or_else(|_| format!("0.0.0.0:{port}").parse().unwrap());
let kv_store = kv::new(&addr, &server_addr, "tcp");
log(LogLevel::Debug, &format!("⚙ Accepted TCP connection from {addr} on port {port}"));
let tcp_config = crate::config::get().listeners.get_tcp(&port.to_string());
if let Some(tcp_config) = tcp_config {
tokio::spawn(async move {
let _conn_guard = _guard;
dispatcher::dispatch_tcp_connection(socket, port, tcp_config, kv_store).await;
});
} else {
log(LogLevel::Warn, &format!("✗ TCP listener is active on port {port}, but no config found. Dropping connection from {addr}."));
}
}
_ = &mut shutdown_fut => {
log(LogLevel::Debug, &format!("⚙ TCP listener on port {port} received shutdown signal."));
break;
}
}
}
TASK_REGISTRY.remove(&key);
log(
LogLevel::Debug,
&format!("⚙ TCP listener on port {port} has shut down."),
);
});
handle
}