mod accept;
pub(crate) mod join;
mod manager;
pub(crate) mod mesh;
pub mod rate_limiter;
pub(crate) mod worker;
use crate::connection::rate_limiter::AuthBackoff;
use crate::{
ManagerPeerEntry,
health::HealthCollector,
peers::{AliveTable, AliveTx},
workers::WorkerTable,
};
pub(crate) use accept::accept_loop;
use ed25519_dalek::SigningKey;
use ipnet::IpNet;
pub(crate) use manager::{handle_manager_as_client, handle_manager_as_server};
use mesh::DialTx;
pub(crate) use mesh::spawn_mesh_runner;
use std::sync::{
Arc,
atomic::{AtomicU32, AtomicU64},
};
use tokio::sync::RwLock;
use volli_commands::CommandDistributor;
use volli_core::WorkerConfig;
#[cfg(test)]
mod tests;
#[cfg(test)]
mod unit_tests;
#[derive(Clone)]
pub(crate) struct SecurityContext {
pub signing: Option<Arc<SigningKey>>,
pub csk: Arc<RwLock<[u8; 32]>>,
pub csk_ver: Arc<AtomicU32>,
}
#[derive(Clone)]
pub(crate) struct NetworkContext {
pub worker_nets: Arc<Vec<IpNet>>,
pub manager_nets: Arc<Vec<IpNet>>,
}
#[derive(Clone)]
pub(crate) struct StateContext {
pub peers: AliveTable,
pub workers: WorkerTable,
pub self_meta: ManagerPeerEntry,
pub peer_version: Arc<AtomicU64>,
pub command_distributor: Option<Arc<CommandDistributor>>,
}
#[derive(Clone)]
pub(crate) struct CommunicationContext {
pub alive_tx: AliveTx,
pub dial_tx: DialTx,
pub profile: String,
}
#[derive(Clone)]
pub(crate) struct HealthContext {
pub collector: Arc<tokio::sync::Mutex<HealthCollector>>,
}
impl HealthContext {
pub fn new(config: crate::health::HealthConfig) -> Self {
let collector = if cfg!(test) {
HealthCollector::with_fixed_metrics(config, Some(0.0), Some(0.0))
} else {
HealthCollector::new(config)
};
Self {
collector: Arc::new(tokio::sync::Mutex::new(collector)),
}
}
#[cfg(test)]
pub fn default() -> Self {
use crate::health::{HealthCollector, HealthConfig};
use std::sync::OnceLock;
static GLOBAL: OnceLock<Arc<tokio::sync::Mutex<HealthCollector>>> = OnceLock::new();
let arc = GLOBAL.get_or_init(|| {
let config = HealthConfig::default();
let collector = if cfg!(test) {
HealthCollector::with_fixed_metrics(config, Some(0.0), Some(0.0))
} else {
HealthCollector::new(config)
};
Arc::new(tokio::sync::Mutex::new(collector))
});
Self {
collector: arc.clone(),
}
}
}
#[derive(Clone)]
pub(crate) struct ManagerContext {
pub security: SecurityContext,
pub network: NetworkContext,
pub state: StateContext,
pub communication: CommunicationContext,
pub health: HealthContext,
pub manager_id: String,
pub worker_config: Option<WorkerConfig>,
pub auth_backoff: Arc<tokio::sync::Mutex<AuthBackoff>>,
}
impl ManagerContext {
pub fn new_server(
security: SecurityContext,
network: NetworkContext,
state: StateContext,
communication: CommunicationContext,
health: HealthContext,
manager_id: String,
) -> Self {
Self {
security,
network,
state,
communication,
health,
manager_id,
worker_config: None,
auth_backoff: Arc::new(tokio::sync::Mutex::new(AuthBackoff::new(
std::time::Duration::from_secs(1),
std::time::Duration::from_secs(30),
))),
}
}
pub fn new_peer(
security: SecurityContext,
state: StateContext,
communication: CommunicationContext,
worker_config: WorkerConfig,
health: HealthContext,
) -> Self {
let manager_id = state.self_meta.manager_id.clone();
Self {
security,
network: NetworkContext {
worker_nets: Arc::new(Vec::new()),
manager_nets: Arc::new(Vec::new()),
},
state,
communication,
health,
manager_id,
worker_config: Some(worker_config),
auth_backoff: Arc::new(tokio::sync::Mutex::new(AuthBackoff::new(
std::time::Duration::from_secs(1),
std::time::Duration::from_secs(30),
))),
}
}
pub fn worker_config(&self) -> Option<&WorkerConfig> {
self.worker_config.as_ref()
}
}