zinit 0.3.7

Process supervisor with dependency management
Documentation
//! Xinet manager - manages multiple xinet proxies
//!
//! The manager keeps track of all registered proxies and provides
//! methods to add, remove, and query them.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::info;

use crate::sdk::xinet::{ProxyStatus, XinetConfig};
use crate::server::xinet::proxy::{IsRunningFn, StartServiceFn, StopServiceFn, XinetProxy};

/// Entry for a running proxy
struct ProxyEntry {
    proxy: Arc<XinetProxy>,
    handle: JoinHandle<()>,
}

/// Manager for xinet proxies
pub struct XinetManager {
    /// Registered proxies
    proxies: Arc<RwLock<HashMap<String, ProxyEntry>>>,

    /// Callback to start a zinit service
    start_service: StartServiceFn,

    /// Callback to stop a zinit service
    stop_service: StopServiceFn,

    /// Callback to check if service is running
    is_running: IsRunningFn,
}

impl XinetManager {
    /// Create a new manager with service callbacks
    pub fn new(
        start_service: StartServiceFn,
        stop_service: StopServiceFn,
        is_running: IsRunningFn,
    ) -> Self {
        Self {
            proxies: Arc::new(RwLock::new(HashMap::new())),
            start_service,
            stop_service,
            is_running,
        }
    }

    /// Register and start a new proxy
    pub async fn register(&self, config: XinetConfig) -> Result<(), String> {
        config.validate()?;

        let name = config.name.clone();

        // Check if already exists
        {
            let proxies = self.proxies.read().await;
            if proxies.contains_key(&name) {
                return Err(format!("Proxy '{}' already exists", name));
            }
        }

        // Create the proxy
        let proxy = Arc::new(XinetProxy::new(
            config,
            Arc::clone(&self.start_service),
            Arc::clone(&self.stop_service),
            Arc::clone(&self.is_running),
        ));

        // Start the proxy task
        let proxy_clone = Arc::clone(&proxy);
        let proxy_name = proxy.config().name.clone();
        let handle = tokio::spawn(async move {
            if let Err(e) = proxy_clone.run().await {
                tracing::error!("[xinet:{}] Proxy error: {}", proxy_name, e);
            }
        });

        // Store the entry
        let entry = ProxyEntry { proxy, handle };
        self.proxies.write().await.insert(name, entry);

        Ok(())
    }

    /// Unregister and stop a proxy
    pub async fn unregister(&self, name: &str) -> Result<(), String> {
        let entry = self
            .proxies
            .write()
            .await
            .remove(name)
            .ok_or_else(|| format!("Proxy '{}' not found", name))?;

        // Signal shutdown
        entry.proxy.shutdown().await;

        // Abort the task
        entry.handle.abort();

        // Clean up socket files if Unix
        for listen_addr in &entry.proxy.config().listen {
            if let Some(path) = listen_addr.as_unix_path() {
                let _ = std::fs::remove_file(path);
            }
        }

        info!("[xinet] Unregistered proxy '{}'", name);
        Ok(())
    }

    /// List all registered proxy names
    pub async fn list(&self) -> Vec<String> {
        self.proxies.read().await.keys().cloned().collect()
    }

    /// Get status of a specific proxy
    pub async fn status(&self, name: &str) -> Option<ProxyStatus> {
        let proxies = self.proxies.read().await;
        let entry = proxies.get(name)?;

        let stats = entry.proxy.stats();
        let config = entry.proxy.config();

        Some(ProxyStatus {
            name: config.name.clone(),
            listen: config.listen_addrs_string(),
            backend: format!("{}", config.backend),
            service: config.service.clone(),
            total_connections: stats.total_connections.load(Ordering::Relaxed),
            active_connections: stats.active_connections.load(Ordering::Relaxed),
            bytes_to_backend: stats.bytes_to_backend.load(Ordering::Relaxed),
            bytes_from_backend: stats.bytes_from_backend.load(Ordering::Relaxed),
            running: !entry.handle.is_finished(),
        })
    }

    /// Get status of all proxies
    pub async fn status_all(&self) -> Vec<ProxyStatus> {
        let proxies = self.proxies.read().await;
        let mut result = Vec::with_capacity(proxies.len());

        for (name, entry) in proxies.iter() {
            let stats = entry.proxy.stats();
            let config = entry.proxy.config();

            result.push(ProxyStatus {
                name: name.clone(),
                listen: config.listen_addrs_string(),
                backend: format!("{}", config.backend),
                service: config.service.clone(),
                total_connections: stats.total_connections.load(Ordering::Relaxed),
                active_connections: stats.active_connections.load(Ordering::Relaxed),
                bytes_to_backend: stats.bytes_to_backend.load(Ordering::Relaxed),
                bytes_from_backend: stats.bytes_from_backend.load(Ordering::Relaxed),
                running: !entry.handle.is_finished(),
            });
        }

        result
    }

    /// Shutdown all proxies
    pub async fn shutdown_all(&self) {
        let mut proxies = self.proxies.write().await;

        for (name, entry) in proxies.drain() {
            entry.proxy.shutdown().await;
            entry.handle.abort();

            // Clean up socket files if Unix
            for listen_addr in &entry.proxy.config().listen {
                if let Some(path) = listen_addr.as_unix_path() {
                    let _ = std::fs::remove_file(path);
                }
            }

            info!("[xinet] Shutdown proxy '{}'", name);
        }
    }
}