use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::net::{TcpListener, UnixListener};
use tokio::sync::{RwLock, Semaphore};
use tracing::{debug, error, info, warn};
use crate::sdk::xinet::{SocketAddr, XinetConfig};
use crate::server::xinet::connection::{Stream, forward_bidirectional, wait_for_socket};
pub type StartServiceFn = Arc<dyn Fn(&str) -> bool + Send + Sync>;
pub type StopServiceFn = Arc<dyn Fn(&str) -> bool + Send + Sync>;
pub type IsRunningFn = Arc<dyn Fn(&str) -> bool + Send + Sync>;
#[derive(Debug, Default)]
pub struct ProxyStats {
pub total_connections: AtomicU64,
pub active_connections: AtomicUsize,
pub bytes_to_backend: AtomicU64,
pub bytes_from_backend: AtomicU64,
pub last_connection: AtomicU64,
}
impl ProxyStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_connection_start(&self) {
self.total_connections.fetch_add(1, Ordering::Relaxed);
self.active_connections.fetch_add(1, Ordering::Relaxed);
self.last_connection.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
Ordering::Relaxed,
);
}
pub fn record_connection_end(&self, to_backend: u64, from_backend: u64) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
self.bytes_to_backend
.fetch_add(to_backend, Ordering::Relaxed);
self.bytes_from_backend
.fetch_add(from_backend, Ordering::Relaxed);
}
}
pub struct XinetProxy {
config: XinetConfig,
stats: Arc<ProxyStats>,
start_service: StartServiceFn,
stop_service: StopServiceFn,
is_running: IsRunningFn,
connection_semaphore: Option<Arc<Semaphore>>,
last_activity: Arc<RwLock<Instant>>,
shutdown: Arc<RwLock<bool>>,
}
impl XinetProxy {
pub fn new(
config: XinetConfig,
start_service: StartServiceFn,
stop_service: StopServiceFn,
is_running: IsRunningFn,
) -> Self {
let connection_semaphore = if config.single_connection {
Some(Arc::new(Semaphore::new(1)))
} else {
None
};
Self {
config,
stats: Arc::new(ProxyStats::new()),
start_service,
stop_service,
is_running,
connection_semaphore,
last_activity: Arc::new(RwLock::new(Instant::now())),
shutdown: Arc::new(RwLock::new(false)),
}
}
pub fn config(&self) -> &XinetConfig {
&self.config
}
pub fn stats(&self) -> &Arc<ProxyStats> {
&self.stats
}
pub async fn shutdown(&self) {
*self.shutdown.write().await = true;
}
async fn ensure_service_running(&self) -> Result<(), String> {
if (self.is_running)(&self.config.service) {
return Ok(());
}
info!(
"[xinet:{}] Starting backend service '{}'",
self.config.name, self.config.service
);
if !(self.start_service)(&self.config.service) {
return Err(format!("Failed to start service '{}'", self.config.service));
}
info!(
"[xinet:{}] Waiting for backend socket {} (timeout: {}s)",
self.config.name, self.config.backend, self.config.connect_timeout
);
if !wait_for_socket(&self.config.backend, self.config.connect_timeout).await {
return Err(format!(
"Backend socket {} not available after {}s",
self.config.backend, self.config.connect_timeout
));
}
info!(
"[xinet:{}] Backend socket {} is ready",
self.config.name, self.config.backend
);
Ok(())
}
async fn touch_activity(&self) {
*self.last_activity.write().await = Instant::now();
}
async fn handle_connection(&self, client_stream: Stream) {
self.stats.record_connection_start();
self.touch_activity().await;
let result = self.do_handle_connection(client_stream).await;
match result {
Ok((to_backend, from_backend)) => {
debug!(
"[xinet:{}] Connection completed: {} bytes to backend, {} bytes from backend",
self.config.name, to_backend, from_backend
);
self.stats.record_connection_end(to_backend, from_backend);
}
Err(e) => {
warn!("[xinet:{}] Connection error: {}", self.config.name, e);
self.stats.record_connection_end(0, 0);
}
}
self.touch_activity().await;
}
async fn do_handle_connection(&self, client_stream: Stream) -> Result<(u64, u64), String> {
self.ensure_service_running().await?;
let backend_stream = Stream::connect(&self.config.backend)
.await
.map_err(|e| format!("Failed to connect to backend: {}", e))?;
let (client_read, client_write) = client_stream.into_split();
let (backend_read, backend_write) = backend_stream.into_split();
let (to_backend, from_backend) =
forward_bidirectional(client_read, client_write, backend_read, backend_write)
.await
.map_err(|e| format!("Forward error: {}", e))?;
Ok((to_backend, from_backend))
}
async fn run_idle_monitor(self: Arc<Self>) {
if self.config.idle_timeout == 0 {
return; }
let idle_duration = Duration::from_secs(self.config.idle_timeout);
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
if *self.shutdown.read().await {
break;
}
if self.stats.active_connections.load(Ordering::Relaxed) > 0 {
continue;
}
let last = *self.last_activity.read().await;
if last.elapsed() >= idle_duration {
if (self.is_running)(&self.config.service) {
info!(
"[xinet:{}] Idle timeout reached ({}s), stopping service '{}'",
self.config.name, self.config.idle_timeout, self.config.service
);
(self.stop_service)(&self.config.service);
}
}
}
}
async fn run_listener(self: Arc<Self>, addr: SocketAddr) -> Result<(), String> {
match addr {
SocketAddr::Unix(ref path) => {
if path.exists() {
let _ = std::fs::remove_file(path);
}
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let listener = UnixListener::bind(path)
.map_err(|e| format!("Failed to bind Unix socket {}: {}", path.display(), e))?;
info!("[xinet:{}] Listening on {}", self.config.name, addr);
self.accept_loop_unix(listener).await
}
SocketAddr::Tcp(ref tcp_addr) => {
let listener = TcpListener::bind(tcp_addr)
.await
.map_err(|e| format!("Failed to bind TCP socket {}: {}", tcp_addr, e))?;
info!("[xinet:{}] Listening on {}", self.config.name, addr);
self.accept_loop_tcp(listener).await
}
}
}
pub async fn run(self: Arc<Self>) -> Result<(), String> {
let idle_self = Arc::clone(&self);
tokio::spawn(async move {
idle_self.run_idle_monitor().await;
});
let listen_addrs = self.config.listen.clone();
if listen_addrs.is_empty() {
return Err("No listen addresses configured".to_string());
}
if listen_addrs.len() == 1 {
return self.run_listener(listen_addrs[0].clone()).await;
}
let mut handles = Vec::new();
for addr in listen_addrs {
let proxy = Arc::clone(&self);
let handle = tokio::spawn(async move {
if let Err(e) = proxy.run_listener(addr.clone()).await {
error!("[xinet] Listener error for {}: {}", addr, e);
}
});
handles.push(handle);
}
loop {
if *self.shutdown.read().await {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
for handle in handles {
handle.abort();
}
Ok(())
}
async fn accept_loop_unix(self: Arc<Self>, listener: UnixListener) -> Result<(), String> {
loop {
if *self.shutdown.read().await {
break;
}
let accept_result = tokio::select! {
result = listener.accept() => result,
_ = tokio::time::sleep(Duration::from_millis(100)) => continue,
};
match accept_result {
Ok((stream, _addr)) => {
info!("[xinet:{}] Accepted Unix connection", self.config.name);
let proxy = Arc::clone(&self);
let permit = if let Some(ref sem) = self.connection_semaphore {
match sem.clone().try_acquire_owned() {
Ok(p) => Some(p),
Err(_) => {
warn!(
"[xinet:{}] Connection rejected: single connection mode active",
self.config.name
);
continue;
}
}
} else {
None
};
tokio::spawn(async move {
proxy.handle_connection(Stream::Unix(stream)).await;
drop(permit);
});
}
Err(e) => {
warn!("[xinet:{}] Accept error: {}", self.config.name, e);
}
}
}
Ok(())
}
async fn accept_loop_tcp(self: Arc<Self>, listener: TcpListener) -> Result<(), String> {
loop {
if *self.shutdown.read().await {
break;
}
let accept_result = tokio::select! {
result = listener.accept() => result,
_ = tokio::time::sleep(Duration::from_millis(100)) => continue,
};
match accept_result {
Ok((stream, addr)) => {
debug!(
"[xinet:{}] Accepted connection from {}",
self.config.name, addr
);
let proxy = Arc::clone(&self);
let permit = if let Some(ref sem) = self.connection_semaphore {
match sem.clone().try_acquire_owned() {
Ok(p) => Some(p),
Err(_) => {
warn!(
"[xinet:{}] Connection rejected: single connection mode active",
self.config.name
);
continue;
}
}
} else {
None
};
tokio::spawn(async move {
proxy.handle_connection(Stream::Tcp(stream)).await;
drop(permit);
});
}
Err(e) => {
warn!("[xinet:{}] Accept error: {}", self.config.name, e);
}
}
}
Ok(())
}
}