volli-manager 0.1.12

Manager for volli
Documentation
mod accept;
pub(crate) mod join;
mod manager;
pub(crate) mod mesh;
pub mod rate_limiter;
pub(crate) mod worker;
use crate::connection::rate_limiter::AuthBackoff;
use crate::{
    ManagerPeerEntry,
    health::HealthCollector,
    peers::{AliveTable, AliveTx},
    workers::WorkerTable,
};
pub(crate) use accept::accept_loop;
use ed25519_dalek::SigningKey;
use ipnet::IpNet;
pub(crate) use manager::{handle_manager_as_client, handle_manager_as_server};
use mesh::DialTx;
pub(crate) use mesh::spawn_mesh_runner;
use std::sync::{
    Arc,
    atomic::{AtomicU32, AtomicU64},
};
use tokio::sync::RwLock;
use volli_commands::CommandDistributor;
use volli_core::WorkerConfig;
#[cfg(test)]
mod tests;
#[cfg(test)]
mod unit_tests;

/// Security and authentication related fields
#[derive(Clone)]
pub(crate) struct SecurityContext {
    pub signing: Option<Arc<SigningKey>>,
    pub csk: Arc<RwLock<[u8; 32]>>,
    pub csk_ver: Arc<AtomicU32>,
}

/// Network configuration and access control
#[derive(Clone)]
pub(crate) struct NetworkContext {
    pub worker_nets: Arc<Vec<IpNet>>,
    pub manager_nets: Arc<Vec<IpNet>>,
}

/// State management for peers and workers
#[derive(Clone)]
pub(crate) struct StateContext {
    pub peers: AliveTable,
    pub workers: WorkerTable,
    pub self_meta: ManagerPeerEntry,
    pub peer_version: Arc<AtomicU64>,
    pub command_distributor: Option<Arc<CommandDistributor>>,
}

/// Communication channels and profile management
#[derive(Clone)]
pub(crate) struct CommunicationContext {
    pub alive_tx: AliveTx,
    pub dial_tx: DialTx,
    pub profile: String,
}

/// Health monitoring context for manager operations
#[derive(Clone)]
pub(crate) struct HealthContext {
    pub collector: Arc<tokio::sync::Mutex<HealthCollector>>,
}

impl HealthContext {
    pub fn new(config: crate::health::HealthConfig) -> Self {
        let collector = if cfg!(test) {
            HealthCollector::with_fixed_metrics(config, Some(0.0), Some(0.0))
        } else {
            HealthCollector::new(config)
        };
        Self {
            collector: Arc::new(tokio::sync::Mutex::new(collector)),
        }
    }

    /// Create a default HealthContext for testing or basic scenarios
    #[cfg(test)]
    pub fn default() -> Self {
        use crate::health::{HealthCollector, HealthConfig};
        use std::sync::OnceLock;
        static GLOBAL: OnceLock<Arc<tokio::sync::Mutex<HealthCollector>>> = OnceLock::new();
        let arc = GLOBAL.get_or_init(|| {
            let config = HealthConfig::default();
            let collector = if cfg!(test) {
                HealthCollector::with_fixed_metrics(config, Some(0.0), Some(0.0))
            } else {
                HealthCollector::new(config)
            };
            Arc::new(tokio::sync::Mutex::new(collector))
        });
        Self {
            collector: arc.clone(),
        }
    }
}

/// Unified context for all manager operations
#[derive(Clone)]
pub(crate) struct ManagerContext {
    pub security: SecurityContext,
    pub network: NetworkContext,
    pub state: StateContext,
    pub communication: CommunicationContext,
    pub health: HealthContext,
    pub manager_id: String,
    pub worker_config: Option<WorkerConfig>,
    pub auth_backoff: Arc<tokio::sync::Mutex<AuthBackoff>>,
}

impl ManagerContext {
    /// Create a new ManagerContext for server operations
    pub fn new_server(
        security: SecurityContext,
        network: NetworkContext,
        state: StateContext,
        communication: CommunicationContext,
        health: HealthContext,
        manager_id: String,
    ) -> Self {
        Self {
            security,
            network,
            state,
            communication,
            health,
            manager_id,
            worker_config: None,
            auth_backoff: Arc::new(tokio::sync::Mutex::new(AuthBackoff::new(
                std::time::Duration::from_secs(1),
                std::time::Duration::from_secs(30),
            ))),
        }
    }

    /// Create a new ManagerContext for peer operations
    pub fn new_peer(
        security: SecurityContext,
        state: StateContext,
        communication: CommunicationContext,
        worker_config: WorkerConfig,
        health: HealthContext,
    ) -> Self {
        let manager_id = state.self_meta.manager_id.clone();
        Self {
            security,
            network: NetworkContext {
                worker_nets: Arc::new(Vec::new()),
                manager_nets: Arc::new(Vec::new()),
            },
            state,
            communication,
            health,
            manager_id,
            worker_config: Some(worker_config),
            auth_backoff: Arc::new(tokio::sync::Mutex::new(AuthBackoff::new(
                std::time::Duration::from_secs(1),
                std::time::Duration::from_secs(30),
            ))),
        }
    }

    /// Get a reference to the worker config, if available
    pub fn worker_config(&self) -> Option<&WorkerConfig> {
        self.worker_config.as_ref()
    }
}