#![cfg_attr(test, allow(unused_crate_dependencies))]
use base64::Engine;
use base64::engine::general_purpose::STANDARD_NO_PAD;
pub use bootstrap::init_csk;
use bootstrap::{CertInit, SigningInit, init_cert, init_signing, setup_quic, setup_tls_acceptor};
pub use bootstrap::{join_secret, load_or_generate_cert};
pub use config::ServerConfigOpts;
use config::parse_nets;
use connection::{ManagerContext, accept_loop, join, spawn_mesh_runner};
use eyre::Report;
pub use join::JoinConfig;
pub use keys::{
ManagerProfileBuilder, ManagerProfileExport, add_peer, bootstrap_keypair, default_secret_dir,
delete_profile, export_profile, import_profile, list_profiles, load_bind_host, load_bootstrap,
load_manager_name, load_manager_whitelist, load_max_workers, load_peer_version, load_peers,
load_peers_for_gossip, load_profile_host, load_quic_port, load_signing_key, load_tcp_port,
load_verifying_key, load_worker_whitelist, profile_exists, remove_peer, rename_profile,
save_bind_host, save_bootstrap, save_manager_name, save_manager_whitelist, save_max_workers,
save_peer_version, save_peers, save_profile_host, save_quic_port, save_tcp_port,
save_worker_whitelist, secret_dir, update_profile,
};
use nu_ansi_term::Color;
use peers::{AliveTable, sweep_dead};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
#[cfg(unix)]
use tokio::signal::unix::{SignalKind, signal};
use tokio::sync::{Mutex, RwLock};
use tokio::{net::TcpListener, sync::broadcast};
use tracing::{error, info, warn};
#[cfg(unix)]
use util::short_id;
pub use util::sleep_backoff;
pub fn ignore_sleep_notify(ignore: bool) {
let _ = ignore;
#[cfg(test)]
{
util::set_ignore_test_sleep_notify(ignore);
}
}
use volli_commands::CommandDistributor;
pub use volli_core::{ConnectionState, ManagerPeerEntry, Message};
pub use volli_transport::MessageTransportExt;
pub use connection::mesh::try_connect_with_fallback;
mod bootstrap;
mod config;
mod connection;
mod control;
pub mod health;
mod peers;
pub mod test_harness;
mod util;
pub mod workers;
pub use connection::rate_limiter::RateLimiter;
#[cfg(test)]
mod tests;
pub async fn join(
join_peer: &ManagerPeerEntry,
token: &str,
profile: &str,
config: &JoinConfig<'_>,
) -> Result<ManagerPeerEntry, Report> {
join::join_as_client(join_peer, token, profile, config).await
}
pub mod keys;
pub mod mesh;
pub async fn run(
mut cfg: ServerConfigOpts,
ready_signal: Option<tokio::sync::oneshot::Sender<()>>,
) -> Result<(), Report> {
if std::env::var("VOLLI_FAST_TESTS")
.map(|v| v != "0")
.unwrap_or_else(|_| std::env::var("NEXTEST").is_ok())
&& cfg.handshake_timeout_ms > 200
{
cfg.handshake_timeout_ms = 200;
}
if cfg.test_disable_listeners {
if let Some(signal) = ready_signal {
let _ = signal.send(());
}
return Ok(());
}
let mut show_bootstrap = false;
let secret_base = cfg
.secret_dir
.as_deref()
.map(std::path::PathBuf::from)
.unwrap_or_else(default_secret_dir);
let secret_dir: &std::path::Path = &secret_base;
let SigningInit {
key: signing,
newly_generated: persist_keys,
sk_path,
pk_path,
id: manager_id,
fingerprint: pub_fp,
} = init_signing(secret_dir, &mut show_bootstrap)?;
let (csk, csk_ver, persist_csk) = bootstrap::init_csk(&cfg.profile)?;
let csk_shared = Arc::new(RwLock::new(csk));
let csk_ver_shared = Arc::new(AtomicU32::new(csk_ver));
let CertInit {
chain: cert_chain,
key,
fingerprint,
cert_der,
key_der,
cert_path,
key_path,
newly_generated: persist_cert,
} = init_cert(&cfg, secret_dir)?;
let quic_endpoint = setup_quic(&cfg.bind, cfg.quic_port, &cert_chain, &key)?;
let quic_port = quic_endpoint.local_addr()?.port();
let tls_acceptor = setup_tls_acceptor(&cert_chain, &key)?;
let tcp_listener = TcpListener::bind((cfg.bind.as_str(), cfg.tcp_port)).await?;
let tcp_port = tcp_listener.local_addr()?.port();
cfg.quic_port = quic_port;
cfg.tcp_port = tcp_port;
if let Err(e) = crate::keys::save_quic_port(&cfg.profile, quic_port) {
tracing::warn!("Failed to save QUIC port to profile: {}", e);
}
if let Err(e) = crate::keys::save_tcp_port(&cfg.profile, tcp_port) {
tracing::warn!("Failed to save TCP port to profile: {}", e);
}
let manager_name = cfg
.manager_name
.clone()
.or_else(|| load_manager_name(&cfg.profile).ok().flatten())
.unwrap_or_else(|| cfg.profile.clone());
if let Err(e) = crate::keys::save_manager_name(&cfg.profile, &manager_name) {
tracing::warn!("Failed to save manager name to profile: {}", e);
}
let self_meta = ManagerPeerEntry {
manager_id: manager_id.clone(),
manager_name,
tenant: "self".into(),
cluster: "default".into(),
host: cfg.advertise_host.clone(),
tcp_port: cfg.tcp_port,
quic_port: cfg.quic_port,
pub_fp: hex::encode(pub_fp),
csk_ver,
tls_cert: STANDARD_NO_PAD.encode(&cert_der),
tls_fp: fingerprint.clone(),
health: None, };
let peers: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let workers: workers::WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let peer_version = Arc::new(AtomicU64::new(1));
if let Ok(ver) = keys::load_peer_version(&cfg.profile)
&& ver > 0
{
peer_version.store(ver, Ordering::SeqCst);
}
let (alive_tx, _) = broadcast::channel(256);
tokio::spawn(sweep_dead(
peers.clone(),
cfg.profile.clone(),
alive_tx.clone(),
peer_version.clone(),
));
let health_context = connection::HealthContext::new(crate::health::HealthConfig {
max_workers: cfg.max_workers,
..Default::default()
});
let dial_tx = spawn_mesh_runner(
&cfg,
self_meta.clone(),
peers.clone(),
peer_version.clone(),
alive_tx.clone(),
csk_shared.clone(),
csk_ver_shared.clone(),
workers.clone(),
health_context.clone(),
);
setup_panic_handler(&cfg.profile);
log_system_info(&cfg);
#[cfg(unix)]
spawn_state_signal_handler(
peers.clone(),
workers.clone(),
health_context.clone(),
cfg.profile.clone(),
);
#[cfg(unix)]
spawn_shutdown_signal_handlers(cfg.profile.clone());
spawn_health_monitor(
cfg.profile.clone(),
peers.clone(),
workers.clone(),
health_context.clone(),
peer_version.clone(),
alive_tx.clone(),
);
let control_sock =
volli_core::util::runtime_socket_path_for_dir(secret_dir, "manager", &cfg.profile);
control::maybe_spawn_control_server(
control_sock,
control::ControlContext {
health: health_context.clone(),
peers: peers.clone(),
workers: workers.clone(),
profile: cfg.profile.clone(),
self_id: manager_id.clone(),
peer_version: peer_version.clone(),
alive_tx: alive_tx.clone(),
},
);
let active_connections = Arc::new(AtomicUsize::new(0));
let worker_nets = parse_nets(&cfg.worker_whitelist);
let manager_nets = parse_nets(&cfg.manager_whitelist);
let token = if let Some(t) = cfg.token.take() {
volli_core::token::decode_token(&t)?
} else {
volli_core::token::issue_bootstrap_token(
&csk,
"self",
"default",
"*",
86_400,
&cfg.advertise_host,
cfg.quic_port,
cfg.tcp_port,
cert_der.clone(),
)?
};
let secret_encoded = volli_core::token::encode_token(&token)?;
info!(
"Server listening on {} TCP:{} QUIC:{}",
cfg.bind, cfg.tcp_port, cfg.quic_port
);
if show_bootstrap {
println!("Worker bootstrap command:\n volli worker --join {secret_encoded}");
println!(
"Hint: run 'volli admin manager-token --manager {}' to get a manager join command",
cfg.profile
);
}
let csk_final = *csk_shared.read().await;
let ver_final = csk_ver_shared.load(Ordering::SeqCst);
update_profile(&cfg.profile)?
.signing_key(&signing, persist_keys, sk_path.clone(), pk_path.clone())
.certificate(
&cert_der,
&key_der,
persist_cert,
cert_path.clone(),
key_path.clone(),
)
.cluster_key(&csk_final, ver_final, persist_csk)
.secret_dir(secret_dir.to_path_buf())
.save_all()?;
if let Some(signal) = ready_signal {
let _ = signal.send(());
}
let command_distributor = Arc::new(CommandDistributor::new(manager_id.clone()));
let ctx = ManagerContext::new_server(
connection::SecurityContext {
signing: Some(signing),
csk: csk_shared.clone(),
csk_ver: csk_ver_shared.clone(),
},
connection::NetworkContext {
worker_nets,
manager_nets,
},
connection::StateContext {
peers,
workers,
self_meta,
peer_version,
command_distributor: Some(command_distributor),
},
connection::CommunicationContext {
alive_tx,
dial_tx,
profile: cfg.profile.clone(),
},
health_context.clone(),
manager_id,
);
let profile_for_logging = cfg.profile.clone();
let quic_port_for_logging = cfg.quic_port;
let result = accept_loop(
ctx,
cfg,
tcp_listener,
tls_acceptor,
quic_endpoint,
active_connections,
)
.await;
match &result {
Ok(_) => info!(
"Manager {} server loop exited normally",
profile_for_logging
),
Err(e) => {
error!("Manager {} server loop crashed: {}", profile_for_logging, e);
error!("Error chain: {:?}", e);
if format!("{}", e).contains("Permission denied") {
error!(
"Hint: Check if port {} is already in use or requires elevated privileges",
quic_port_for_logging
);
} else if format!("{}", e).contains("Address already in use") {
error!(
"Hint: Another process is already using port {}",
quic_port_for_logging
);
} else if format!("{}", e).contains("No such file or directory") {
error!("Hint: Check if certificates or key files exist and are readable");
}
}
}
result
}
fn setup_panic_handler(profile: &str) {
let profile = profile.to_string();
std::panic::set_hook(Box::new(move |panic_info| {
let backtrace = std::backtrace::Backtrace::force_capture();
if let Some(location) = panic_info.location() {
let message = if let Some(msg) = panic_info.payload().downcast_ref::<&str>() {
msg.to_string()
} else if let Some(msg) = panic_info.payload().downcast_ref::<String>() {
msg.clone()
} else {
"unknown panic".to_string()
};
error!(
"PANIC in manager {}: {} at {}:{}:{}",
profile,
message,
location.file(),
location.line(),
location.column()
);
} else {
let message = if let Some(msg) = panic_info.payload().downcast_ref::<&str>() {
msg.to_string()
} else if let Some(msg) = panic_info.payload().downcast_ref::<String>() {
msg.clone()
} else {
"unknown panic".to_string()
};
error!("PANIC in manager {}: {}", profile, message);
}
error!("Backtrace:\n{}", backtrace);
std::thread::sleep(std::time::Duration::from_millis(100));
}));
}
fn log_system_info(cfg: &ServerConfigOpts) {
info!("Manager configuration:");
info!(" Profile: {}", cfg.profile);
info!(" Bind: {}:{}+{}", cfg.bind, cfg.tcp_port, cfg.quic_port);
info!(" Advertise: {}", cfg.advertise_host);
#[cfg(unix)]
{
if let Ok(limit) = std::process::Command::new("ulimit").arg("-n").output()
&& let Ok(output) = String::from_utf8(limit.stdout)
{
info!("File descriptor limit: {}", output.trim());
}
}
let pid = std::process::id();
info!("Process ID: {}", pid);
}
#[cfg(unix)]
fn spawn_shutdown_signal_handlers(profile: String) {
tokio::spawn(async move {
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM handler");
let mut sigint = signal(SignalKind::interrupt()).expect("Failed to setup SIGINT handler");
tokio::select! {
_ = sigterm.recv() => {
warn!("Received SIGTERM, manager {} shutting down gracefully", profile);
std::process::exit(0);
}
_ = sigint.recv() => {
warn!("Received SIGINT, manager {} shutting down gracefully", profile);
std::process::exit(0);
}
}
});
}
fn spawn_health_monitor(
profile: String,
peers: AliveTable,
workers: crate::workers::WorkerTable,
health: connection::HealthContext,
peer_version: Arc<AtomicU64>,
alive_tx: broadcast::Sender<u64>,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
let mut last_peer_count = 0;
let mut last_worker_count = 0;
let mut check_count = 0;
let mut last_health_snapshot: Option<volli_core::HealthMetrics> = None;
loop {
interval.tick().await;
check_count += 1;
let peer_count = peers
.lock()
.await
.values()
.filter(|p| p.conn_state != ConnectionState::Inactive)
.count();
let workers_list = crate::workers::list_workers(&workers).await;
let worker_total = workers_list.len();
let worker_active = workers_list
.iter()
.filter(|w| w.disconnected_at.is_none())
.count();
if check_count % 4 == 0 {
info!(
"Manager {} health check #{}: {} peers, {} workers ({} active), uptime: {} checks",
profile, check_count, peer_count, worker_total, worker_active, check_count
);
}
if peer_count != last_peer_count {
last_peer_count = peer_count;
}
if worker_active != last_worker_count {
last_worker_count = worker_active;
}
if check_count % 12 == 0 {
check_system_resources(&profile).await;
}
if check_count % 20 == 0 {
check_for_potential_issues(&profile, &peers, &workers).await;
}
let cur = health.collector.lock().await.collect_metrics().await;
let now = cur.last_health_update;
let cur_core = volli_core::HealthMetrics {
health_score: cur.health_score,
load_percentage: cur.load_percentage,
max_workers: cur.max_workers,
current_workers: cur.current_workers,
avg_cpu: cur.avg_cpu,
avg_memory: cur.avg_memory,
last_health_update: now,
};
let should_bump = match &last_health_snapshot {
None => true,
Some(prev) => {
let hs_delta = (prev.health_score - cur_core.health_score).abs();
let cpu_delta = match (prev.avg_cpu, cur_core.avg_cpu) {
(Some(a), Some(b)) => (a - b).abs(),
_ => 0.0,
};
let mem_delta = match (prev.avg_memory, cur_core.avg_memory) {
(Some(a), Some(b)) => (a - b).abs(),
_ => 0.0,
};
let workers_changed = prev.current_workers != cur_core.current_workers;
let load_delta = (prev.load_percentage - cur_core.load_percentage).abs();
hs_delta >= 0.05
|| cpu_delta >= 10.0
|| mem_delta >= 10.0
|| workers_changed
|| load_delta >= 10.0
}
};
if should_bump {
last_health_snapshot = Some(cur_core);
let new_ver = peer_version.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
let _ = crate::keys::save_peer_version(&profile, new_ver);
let _ = alive_tx.send(new_ver);
tracing::debug!(target: "health", profile=%profile, version=new_ver, "bumped peer version due to health change");
}
}
});
}
async fn check_system_resources(profile: &str) {
let pid = std::process::id();
#[cfg(target_os = "macos")]
{
if let Ok(output) = tokio::process::Command::new("ps")
.args(["-o", "rss,vsz", "-p", &pid.to_string()])
.output()
.await
&& let Ok(result) = String::from_utf8(output.stdout)
{
let lines: Vec<&str> = result.trim().lines().collect();
if lines.len() > 1
&& let Some(memory_line) = lines.get(1)
{
let parts: Vec<&str> = memory_line.split_whitespace().collect();
if parts.len() >= 2
&& let (Ok(rss), Ok(vsz)) = (parts[0].parse::<u64>(), parts[1].parse::<u64>())
{
let rss_mb = rss / 1024;
let vsz_mb = vsz / 1024;
info!(
"Manager {} memory: {}MB RSS, {}MB VSZ",
profile, rss_mb, vsz_mb
);
if rss_mb > 500 {
warn!("Manager {} high memory usage: {}MB RSS", profile, rss_mb);
}
}
}
}
}
#[cfg(unix)]
{
if let Ok(output) = tokio::process::Command::new("lsof")
.args(["-p", &pid.to_string()])
.output()
.await
{
let fd_count = String::from_utf8_lossy(&output.stdout)
.lines()
.count()
.saturating_sub(1);
if fd_count > 100 {
info!("Manager {} file descriptors: {}", profile, fd_count);
}
if fd_count > 800 {
warn!(
"Manager {} high file descriptor usage: {}",
profile, fd_count
);
}
}
}
}
async fn check_for_potential_issues(
profile: &str,
peers: &AliveTable,
workers: &crate::workers::WorkerTable,
) {
match tokio::time::timeout(std::time::Duration::from_secs(5), peers.lock()).await {
Ok(_) => {
}
Err(_) => {
error!(
"🔒 Manager {} potential deadlock: peer table lock timeout",
profile
);
}
}
match tokio::time::timeout(std::time::Duration::from_secs(5), workers.lock()).await {
Ok(_) => {
}
Err(_) => {
error!(
"🔒 Manager {} potential deadlock: worker table lock timeout",
profile
);
}
}
let test_start = std::time::Instant::now();
tokio::task::yield_now().await;
let yield_time = test_start.elapsed();
if yield_time > std::time::Duration::from_millis(100) {
warn!(
"🐌 Manager {} slow task scheduling: {:?}",
profile, yield_time
);
}
}
#[cfg(unix)]
fn spawn_state_signal_handler(
peers: AliveTable,
workers: crate::workers::WorkerTable,
health: connection::HealthContext,
profile: String,
) {
tokio::spawn(async move {
if let Ok(mut sig) = signal(SignalKind::user_defined1()) {
while sig.recv().await.is_some() {
info!(
"📡 Received SIGUSR1 signal, printing state for manager: {}",
profile
);
print_peer_states(&peers, &workers, &health, &profile).await;
}
} else {
error!(
"Failed to setup SIGUSR1 signal handler for manager: {}",
profile
);
}
});
}
#[cfg(unix)]
async fn print_peer_states(
peers: &AliveTable,
workers: &crate::workers::WorkerTable,
health: &connection::HealthContext,
profile: &str,
) {
let map = peers.lock().await;
let current_health = if let Ok(mut collector) = health.collector.try_lock() {
Some(collector.collect_metrics().await)
} else {
None
};
let worker_count = workers.lock().await.len();
let peer_entries = load_peers_for_gossip(profile).unwrap_or_default();
tracing::debug!(target: "peer_states", profile=%profile, peer_count=%peer_entries.len(), "loaded peer entries for status display");
let manager_data: std::collections::HashMap<
String,
(String, Option<volli_core::HealthMetrics>),
> = peer_entries
.iter()
.map(|p| {
(
p.manager_id.clone(),
(p.manager_name.clone(), p.health.clone()),
)
})
.collect();
tracing::debug!(target: "peer_states", profile=%profile, name_count=%manager_data.len(), "built manager data map");
let mut parts = Vec::new();
for (full_id, state) in map.iter() {
let manager_id = match full_id.split(':').collect::<Vec<_>>() {
segments if segments.len() == 3 => segments[2],
_ => full_id,
};
let (name_disp, peer_health) = manager_data
.get(manager_id)
.cloned()
.unwrap_or_else(|| {
tracing::debug!(target: "peer_states", manager_id=%manager_id, "no data found, using short_id");
(short_id(manager_id).to_string(), None)
});
let (sym, color) = match state.conn_state {
ConnectionState::Client => ("⇢", Color::Green),
ConnectionState::Server => ("⇠", Color::Green),
ConnectionState::Inactive => ("×", Color::Red),
};
let health_info = if let Some(health) = peer_health {
format!(
" (workers:{} health:{:.0}% cpu:{:.0}% mem:{:.0}%)",
health.current_workers,
health.health_score * 100.0,
health.avg_cpu.unwrap_or(0.0),
health.avg_memory.unwrap_or(0.0)
)
} else {
String::new()
};
parts.push((
full_id.clone(),
format!("{}{}{}", color.paint(sym), name_disp, health_info),
));
}
parts.sort_by(|a, b| a.0.cmp(&b.0));
let sorted_parts: Vec<String> = parts.into_iter().map(|(_, part)| part).collect();
if let Some(health) = current_health {
println!(
"[{}] workers:{} health:{:.0}% cpu:{:.0}% mem:{:.0}% load:{:.0}%",
profile,
worker_count,
health.health_score * 100.0,
health.avg_cpu.unwrap_or(0.0),
health.avg_memory.unwrap_or(0.0),
health.load_percentage
);
} else {
println!(
"[{}] workers:{} (health data unavailable)",
profile, worker_count
);
}
if sorted_parts.is_empty() {
println!("peers: none");
} else {
println!("peers: {}", sorted_parts.join(" "));
}
}