use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::info;
use crate::sdk::xinet::{ProxyStatus, XinetConfig};
use crate::server::xinet::proxy::{IsRunningFn, StartServiceFn, StopServiceFn, XinetProxy};
struct ProxyEntry {
proxy: Arc<XinetProxy>,
handle: JoinHandle<()>,
}
pub struct XinetManager {
proxies: Arc<RwLock<HashMap<String, ProxyEntry>>>,
start_service: StartServiceFn,
stop_service: StopServiceFn,
is_running: IsRunningFn,
}
impl XinetManager {
pub fn new(
start_service: StartServiceFn,
stop_service: StopServiceFn,
is_running: IsRunningFn,
) -> Self {
Self {
proxies: Arc::new(RwLock::new(HashMap::new())),
start_service,
stop_service,
is_running,
}
}
pub async fn register(&self, config: XinetConfig) -> Result<(), String> {
config.validate()?;
let name = config.name.clone();
{
let mut proxies = self.proxies.write().await;
if let Some(entry) = proxies.remove(&name) {
entry.proxy.shutdown().await;
entry.handle.abort();
for listen_addr in &entry.proxy.config().listen {
if let Some(path) = listen_addr.as_unix_path() {
let _ = std::fs::remove_file(path);
}
}
info!("[xinet] Replacing existing proxy '{}'", name);
}
}
let proxy = Arc::new(XinetProxy::new(
config,
Arc::clone(&self.start_service),
Arc::clone(&self.stop_service),
Arc::clone(&self.is_running),
));
let proxy_clone = Arc::clone(&proxy);
let proxy_name = proxy.config().name.clone();
let handle = tokio::spawn(async move {
if let Err(e) = proxy_clone.run().await {
tracing::error!("[xinet:{}] Proxy error: {}", proxy_name, e);
}
});
let entry = ProxyEntry { proxy, handle };
self.proxies.write().await.insert(name, entry);
Ok(())
}
pub async fn unregister(&self, name: &str) -> Result<(), String> {
let entry = self
.proxies
.write()
.await
.remove(name)
.ok_or_else(|| format!("Proxy '{}' not found", name))?;
entry.proxy.shutdown().await;
entry.handle.abort();
for listen_addr in &entry.proxy.config().listen {
if let Some(path) = listen_addr.as_unix_path() {
let _ = std::fs::remove_file(path);
}
}
info!("[xinet] Unregistered proxy '{}'", name);
Ok(())
}
pub async fn list(&self) -> Vec<String> {
self.proxies.read().await.keys().cloned().collect()
}
pub async fn status(&self, name: &str) -> Option<ProxyStatus> {
let proxies = self.proxies.read().await;
let entry = proxies.get(name)?;
let stats = entry.proxy.stats();
let config = entry.proxy.config();
Some(ProxyStatus {
name: config.name.clone(),
listen: config.listen_addrs_string(),
backend: format!("{}", config.backend),
service: config.service.clone(),
total_connections: stats.total_connections.load(Ordering::Relaxed),
active_connections: stats.active_connections.load(Ordering::Relaxed),
bytes_to_backend: stats.bytes_to_backend.load(Ordering::Relaxed),
bytes_from_backend: stats.bytes_from_backend.load(Ordering::Relaxed),
running: !entry.handle.is_finished(),
})
}
pub async fn status_all(&self) -> Vec<ProxyStatus> {
let proxies = self.proxies.read().await;
let mut result = Vec::with_capacity(proxies.len());
for (name, entry) in proxies.iter() {
let stats = entry.proxy.stats();
let config = entry.proxy.config();
result.push(ProxyStatus {
name: name.clone(),
listen: config.listen_addrs_string(),
backend: format!("{}", config.backend),
service: config.service.clone(),
total_connections: stats.total_connections.load(Ordering::Relaxed),
active_connections: stats.active_connections.load(Ordering::Relaxed),
bytes_to_backend: stats.bytes_to_backend.load(Ordering::Relaxed),
bytes_from_backend: stats.bytes_from_backend.load(Ordering::Relaxed),
running: !entry.handle.is_finished(),
});
}
result
}
pub async fn shutdown_all(&self) {
let mut proxies = self.proxies.write().await;
for (name, entry) in proxies.drain() {
entry.proxy.shutdown().await;
entry.handle.abort();
for listen_addr in &entry.proxy.config().listen {
if let Some(path) = listen_addr.as_unix_path() {
let _ = std::fs::remove_file(path);
}
}
info!("[xinet] Shutdown proxy '{}'", name);
}
}
}