vane 0.9.2

A flow-based reverse proxy with multi-layer routing and programmable pipelines.
/* src/ingress/tcp.rs */

use super::state::{ListenerState, Protocol, TASK_REGISTRY};

use crate::layers::l4::dispatcher;

use crate::resources::kv;
use fancy_log::{LogLevel, log};
use sigterm::{Shutdown, ShutdownHandle};
use tokio::net::TcpListener;

/// Spawns a dedicated Tokio task to listen for TCP connections on a given port.
pub fn spawn_tcp_listener_task(port: u16, listener: TcpListener) -> ShutdownHandle {
	let (shutdown, handle) = Shutdown::new();
	let key = (port, Protocol::Tcp);
	tokio::spawn(async move {
		let mut shutdown_fut = std::pin::pin!(shutdown.recv());
		loop {
			tokio::select! {
				Ok((socket, addr)) = listener.accept() => {
					let client_ip: std::net::IpAddr = addr.ip();

					// Apply Connection Rate Limits
					let Some(_guard) = super::tasks::GLOBAL_TRACKER.acquire(client_ip) else {
						log(LogLevel::Debug, &format!("⚙ Rate limited TCP connection from {addr} on port {port}"));
						continue;
					};

					if let Some(task) = TASK_REGISTRY.get(&key) {
						let mut state = task.state.lock().await;
						if let ListenerState::Draining {..} = *state {
							log(LogLevel::Debug, &format!("⚙ Rejecting new connection from {addr} on draining port {port}"));
							continue;
						}
						*state = ListenerState::Active;
					}

					// Create the KV store as soon as the connection is accepted.
					let server_addr = socket.local_addr().unwrap_or_else(|_| format!("0.0.0.0:{port}").parse().unwrap());
					let kv_store = kv::new(&addr, &server_addr, "tcp");
					log(LogLevel::Debug, &format!("⚙ Accepted TCP connection from {addr} on port {port}"));

					let tcp_config = crate::config::get().listeners.get_tcp(&port.to_string());
					if let Some(tcp_config) = tcp_config {
						tokio::spawn(async move {
							// Move guard into the task so it lives as long as the connection
							let _conn_guard = _guard;
							dispatcher::dispatch_tcp_connection(socket, port, tcp_config, kv_store).await;
						});
					} else {
						log(LogLevel::Warn, &format!("✗ TCP listener is active on port {port}, but no config found. Dropping connection from {addr}."));
					}
				}
				_ = &mut shutdown_fut => {
					log(LogLevel::Debug, &format!("⚙ TCP listener on port {port} received shutdown signal."));
					break;
				}
			}
		}
		TASK_REGISTRY.remove(&key);
		log(
			LogLevel::Debug,
			&format!("⚙ TCP listener on port {port} has shut down."),
		);
	});
	handle
}