use super::ManagerContext;
use crate::{
ManagerPeerEntry, Message, load_peers_for_gossip, util::now_secs, workers::update_worker,
};
use ed25519_dalek::SigningKey;
use eyre::{Report, eyre};
use ipnet::IpNet;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::time;
use tracing::{info, warn};
use volli_commands::{CommandStatus, PrivilegeLevel, ResourceLimits, WorkerCapabilities};
use volli_core::WorkerEntry;
use volli_core::env_config;
use volli_core::handshake;
use volli_core::token::{
decode_token, encode_token, issue_bootstrap_token, issue_token, verify_token,
verify_token_signature,
};
use volli_transport::MessageTransportExt;
use volli_transport::Transport;
async fn update_self_meta_with_health(ctx: &ManagerContext) -> ManagerPeerEntry {
let mut self_meta = ctx.state.self_meta.clone();
let health_metrics = ctx.health.collector.lock().await.collect_metrics().await;
self_meta.health = Some(volli_core::HealthMetrics {
health_score: health_metrics.health_score,
load_percentage: health_metrics.load_percentage,
max_workers: health_metrics.max_workers,
current_workers: health_metrics.current_workers,
avg_cpu: health_metrics.avg_cpu,
avg_memory: health_metrics.avg_memory,
last_health_update: health_metrics.last_health_update,
});
self_meta
}
async fn auth_worker(
transport: &mut Box<dyn Transport>,
csk: &[u8; 32],
peer: std::net::SocketAddr,
whitelist: &Arc<Vec<IpNet>>,
tenant: &str,
cluster: &str,
) -> Result<(String, Option<String>), Report> {
if !whitelist.is_empty() && !whitelist.iter().any(|n| n.contains(&peer.ip())) {
warn!(%peer, "worker connection rejected by whitelist");
return Err(eyre!("not allowed"));
}
let timeout = env_config().auth_timeout_secs();
let mut deadline = Instant::now() + time::Duration::from_secs(timeout);
loop {
match time::timeout_at(deadline.into(), transport.recv()).await {
Ok(Ok(Some(Message::Auth {
token,
worker_id,
worker_name,
}))) => {
let token = decode_token(&token)?;
if token.payload.tenant != tenant || token.payload.cluster != cluster {
tracing::warn!(
%peer,
token_tenant = %token.payload.tenant,
token_cluster = %token.payload.cluster,
expected_tenant = %tenant,
expected_cluster = %cluster,
"worker token tenant/cluster mismatch"
);
transport.send(&Message::AuthErr).await.ok();
return Err(eyre!("tenant/cluster mismatch"));
}
let now = now_secs();
if token.payload.exp < now {
if should_allow_refresh(&token, now)
&& verify_token_signature(&token, csk).is_ok()
{
if let Ok(refresh) = refresh_worker_token(csk, &token) {
let _ = transport
.send(&Message::TokenRefreshOk { token: refresh })
.await;
} else {
let _ = transport
.send(&Message::TokenRefreshErr {
reason: "refresh failed".to_string(),
})
.await;
}
deadline = Instant::now() + time::Duration::from_secs(timeout);
continue;
}
transport.send(&Message::AuthErr).await.ok();
return Err(eyre!("token expired"));
}
verify_token(&token, csk)?;
let resolved_worker_id = if let Some(id) = worker_id {
if id.trim().is_empty() { None } else { Some(id) }
} else if token.payload.worker_id.is_empty() || token.payload.worker_id == "*" {
None
} else {
Some(token.payload.worker_id)
};
let resolved_worker_id = match resolved_worker_id {
Some(id) if !id.is_empty() => id,
_ => {
transport.send(&Message::AuthErr).await.ok();
return Err(eyre!("invalid worker"));
}
};
transport.send(&Message::AuthOk).await?;
return Ok((resolved_worker_id, worker_name));
}
Ok(Ok(Some(Message::TokenRefreshRequest { token }))) => {
let token = decode_token(&token)?;
let now = now_secs();
if token.payload.tenant != tenant || token.payload.cluster != cluster {
let _ = transport
.send(&Message::TokenRefreshErr {
reason: "tenant/cluster mismatch".to_string(),
})
.await;
} else if token.payload.exp < now {
if should_allow_refresh(&token, now)
&& verify_token_signature(&token, csk).is_ok()
{
match refresh_worker_token(csk, &token) {
Ok(refresh) => {
let _ = transport
.send(&Message::TokenRefreshOk { token: refresh })
.await;
}
Err(e) => {
let _ = transport
.send(&Message::TokenRefreshErr {
reason: format!("refresh failed: {e}"),
})
.await;
}
}
} else {
let _ = transport
.send(&Message::TokenRefreshErr {
reason: "expired".to_string(),
})
.await;
}
} else {
match verify_token(&token, csk) {
Ok(_) => match refresh_worker_token(csk, &token) {
Ok(refresh) => {
let _ = transport
.send(&Message::TokenRefreshOk { token: refresh })
.await;
}
Err(e) => {
let _ = transport
.send(&Message::TokenRefreshErr {
reason: format!("refresh failed: {e}"),
})
.await;
}
},
Err(e) => {
let _ = transport
.send(&Message::TokenRefreshErr {
reason: format!("refresh verify error: {e}"),
})
.await;
}
}
}
deadline = Instant::now() + time::Duration::from_secs(timeout);
continue;
}
Ok(Ok(_)) => {
transport.send(&Message::AuthErr).await.ok();
return Err(eyre!("missing auth"));
}
Ok(Err(e)) => return Err(e.into()),
Err(_) => {
tracing::warn!(%peer, "worker auth timed out");
transport.send(&Message::AuthErr).await.ok();
return Err(eyre!("auth timeout"));
}
}
}
}
fn should_allow_refresh(token: &volli_core::token::Token, now: u64) -> bool {
let ttl = token.payload.exp.saturating_sub(token.payload.iat).max(60);
now <= token.payload.exp.saturating_add(ttl)
}
fn refresh_worker_token(csk: &[u8; 32], tok: &volli_core::token::Token) -> Result<String, Report> {
let ttl = tok.payload.exp.saturating_sub(tok.payload.iat).max(60);
let refreshed = if let (Some(host), Some(quic), Some(tcp), Some(cert)) = (
tok.payload.host.as_deref(),
tok.payload.quic_port,
tok.payload.tcp_port,
tok.payload.cert.clone(),
) {
issue_bootstrap_token(
csk,
&tok.payload.tenant,
&tok.payload.cluster,
&tok.payload.worker_id,
ttl,
host,
quic,
tcp,
cert,
)
} else {
issue_token(
csk,
&tok.payload.tenant,
&tok.payload.cluster,
&tok.payload.worker_id,
ttl,
)
};
refreshed.and_then(|t| encode_token(&t))
}
async fn handshake_worker(
transport: &mut Box<dyn Transport>,
signing: &SigningKey,
manager_id: &str,
) -> Result<(), Report> {
let mut nonce = [0u8; 32];
getrandom::fill(&mut nonce)?;
let sig = handshake::sign_nonce(signing, &nonce);
let sig_send = sig.clone();
transport
.send(&Message::Hello {
manager_id: manager_id.to_string(),
nonce,
sig: sig_send,
})
.await?;
match transport.recv().await? {
Some(Message::Welcome {
manager_id: cid,
nonce: rnonce,
sig: rsig,
}) if cid == manager_id && rnonce == nonce && rsig == sig => Ok(()),
_ => Err(eyre!("handshake failed")),
}
}
pub(crate) async fn handle_worker_join(
ctx: &ManagerContext,
mut transport: Box<dyn Transport>,
peer: std::net::SocketAddr,
) -> Result<(), Report> {
{
let mut backoff = ctx.auth_backoff.lock().await;
if !backoff.allow(peer.ip()) {
warn!(%peer, "worker auth backoff active");
return Ok(());
}
}
let csk = *ctx.security.csk.read().await;
let auth_result = auth_worker(
&mut transport,
&csk,
peer,
&ctx.network.worker_nets,
&ctx.state.self_meta.tenant,
&ctx.state.self_meta.cluster,
)
.await;
let (worker_id, worker_name) = match auth_result {
Ok(v) => v,
Err(e) => {
let delay = {
let mut backoff = ctx.auth_backoff.lock().await;
backoff.record_failure(peer.ip())
};
warn!(%peer, delay_ms=%delay.as_millis(), "worker auth failed; backing off");
return Err(e);
}
};
{
let mut backoff = ctx.auth_backoff.lock().await;
backoff.record_success(peer.ip());
}
handshake_worker(
&mut transport,
ctx.security.signing.as_ref().unwrap(),
&ctx.manager_id,
)
.await?;
info!(target: "connection", %peer, "connection established");
let worker_count = ctx
.health
.collector
.lock()
.await
.register_worker(&worker_id);
info!(target: "health", %peer, worker_count, "worker connected, updated count");
let prev = worker_count.saturating_sub(1);
info!(
"Manager {} gained {} worker(s): {} -> {}",
ctx.communication.profile, 1, prev, worker_count
);
let new_ver = ctx
.state
.peer_version
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1;
let _ = crate::save_peer_version(&ctx.communication.profile, new_ver);
let _ = ctx.communication.alive_tx.send(new_ver);
update_worker(
&ctx.state.workers,
&ctx.communication.alive_tx,
WorkerEntry {
worker_id: worker_id.clone(),
manager_id: ctx.manager_id.clone(),
worker_name: worker_name.clone(),
last_seen: Some(now_secs()),
connected_since: Some(now_secs()),
disconnected_at: None,
},
&ctx.state.peer_version,
)
.await;
let (worker_tx, mut worker_rx) = mpsc::unbounded_channel();
if let Some(distributor) = &ctx.state.command_distributor {
distributor.unregister_worker(&worker_id).await;
let mut supported_commands = std::collections::HashSet::new();
supported_commands.insert("ping".to_string());
supported_commands.insert("traceroute".to_string());
supported_commands.insert("dns".to_string());
supported_commands.insert("port-scan".to_string());
supported_commands.insert("http".to_string());
supported_commands.insert("load".to_string());
supported_commands.insert("connect".to_string());
supported_commands.insert("exec".to_string());
let capabilities = WorkerCapabilities {
supported_commands,
privilege_level: PrivilegeLevel::User,
max_execution_time: std::time::Duration::from_secs(300),
resource_limits: ResourceLimits {
max_memory_mb: None,
max_cpu_percent: None,
max_network_bandwidth: None,
},
};
distributor
.register_worker(worker_id.clone(), worker_tx, HashMap::new(), capabilities)
.await;
info!(target: "connection", %peer, worker_id, "worker registered with command distributor");
}
let mut last_version_sent = ctx
.state
.peer_version
.load(std::sync::atomic::Ordering::SeqCst);
let peers_list = load_peers_for_gossip(&ctx.communication.profile).unwrap_or_default();
debug_assert!(
peers_list.len()
== crate::load_peers_for_gossip(&ctx.communication.profile)
.unwrap_or_default()
.len(),
"worker announce: peers list should reflect full current view"
);
transport
.send(&Message::Announce {
meta: Box::new(update_self_meta_with_health(ctx).await),
version: last_version_sent,
peers: peers_list,
workers: Vec::new(),
})
.await?;
let mut peer_rx = ctx.communication.alive_tx.subscribe();
let ecfg = volli_core::env_config();
let hb_secs = ecfg.heartbeat_secs();
let idle_grace = std::time::Duration::from_secs(hb_secs * ecfg.worker_idle_multiplier());
use tokio::time::{Instant, sleep};
let mut last_seen = Instant::now();
let idle_sleep = sleep(idle_grace);
tokio::pin!(idle_sleep);
tracing::info!(target: "connection", %peer, worker_id, idle_secs = idle_grace.as_secs(), "worker handler started; liveness timer armed");
loop {
tokio::select! {
Ok(new_ver) = peer_rx.recv() => {
if new_ver > last_version_sent {
last_version_sent = new_ver;
let peers = load_peers_for_gossip(&ctx.communication.profile).unwrap_or_default();
debug_assert!(
peers.len()
== crate::load_peers_for_gossip(&ctx.communication.profile)
.unwrap_or_default()
.len(),
"worker announce (ver bump): peers list should reflect full current view"
);
let _ = transport
.send(&Message::Announce {
meta: Box::new(update_self_meta_with_health(ctx).await),
version: new_ver,
peers,
workers: Vec::new(),
})
.await;
}
}
transport_msg = transport.recv() => {
match transport_msg {
Ok(Some(msg)) => {
match msg {
Message::TokenRefreshRequest { token } => {
match decode_token(&token)
.and_then(|t| verify_token(&t, &csk).map(|_| t))
{
Ok(tok) => {
let ttl = tok.payload.exp.saturating_sub(tok.payload.iat);
let ttl = ttl.max(60);
let refreshed = if let (Some(host), Some(quic), Some(tcp), Some(cert)) = (
tok.payload.host.as_deref(),
tok.payload.quic_port,
tok.payload.tcp_port,
tok.payload.cert.clone(),
) {
issue_bootstrap_token(
&csk,
&tok.payload.tenant,
&tok.payload.cluster,
&tok.payload.worker_id,
ttl,
host,
quic,
tcp,
cert,
)
} else {
issue_token(
&csk,
&tok.payload.tenant,
&tok.payload.cluster,
&tok.payload.worker_id,
ttl,
)
};
match refreshed.and_then(|t| encode_token(&t)) {
Ok(encoded) => {
let _ = transport.send(&Message::TokenRefreshOk { token: encoded }).await;
}
Err(e) => {
let _ = transport.send(&Message::TokenRefreshErr { reason: format!("refresh encode error: {e}") }).await;
}
}
}
Err(e) => {
let _ = transport.send(&Message::TokenRefreshErr { reason: format!("refresh verify error: {e}") }).await;
}
}
}
Message::Ping { version: wver } => {
last_seen = Instant::now();
idle_sleep.as_mut().reset(Instant::now() + idle_grace);
let cur = ctx.state.peer_version.load(std::sync::atomic::Ordering::SeqCst);
if wver < cur {
let peers = load_peers_for_gossip(&ctx.communication.profile).unwrap_or_default();
debug_assert!(
peers.len()
== crate::load_peers_for_gossip(&ctx.communication.profile)
.unwrap_or_default()
.len(),
"worker announce (ping catch-up): peers list should reflect full current view"
);
tracing::debug!(target: "connection", %peer, worker_id, count=%peers.len(), "gossip catch-up");
if transport
.send(&Message::Announce {
meta: Box::new(update_self_meta_with_health(ctx).await),
version: cur,
peers,
workers: Vec::new(),
})
.await
.is_err()
{
break;
}
}
}
Message::WorkerCommandResponse { request_id, worker_id: _resp_worker_id, success, duration_millis, output } => {
last_seen = Instant::now();
idle_sleep.as_mut().reset(Instant::now() + idle_grace);
tracing::info!(target: "connection", %peer, worker_id, %request_id, "liveness: response; idle reset");
info!(target: "connection", %peer, worker_id, request_id,
"Manager received worker response: success={}, duration={}ms, output_len={}",
success, duration_millis, output.len());
if let Some(distributor) = &ctx.state.command_distributor {
let status = if success { CommandStatus::Success } else { CommandStatus::Failed };
let duration = std::time::Duration::from_millis(duration_millis);
info!(target: "connection", %peer, worker_id, request_id,
"Processing worker response with status: {:?}", status);
let footer = volli_commands::CommandFooter::Custom {
name: "generic".to_string(),
data: serde_json::json!({
"output": output,
"success": success,
"duration_ms": duration_millis
}),
};
match distributor
.handle_command_complete(volli_commands::distribution::CompletionArgs {
request_id: request_id.clone(),
worker_id: worker_id.clone(),
total_stream_count: 0,
duration,
final_status: status,
footer_data: footer,
metadata: HashMap::new(),
})
.await
{
Ok(_) => info!(target: "connection", %peer, worker_id, request_id,
"Successfully forwarded worker response to CommandDistributor"),
Err(e) => warn!(target: "connection", %peer, worker_id, request_id,
"Failed to forward worker response to CommandDistributor: {}", e),
}
} else {
warn!(target: "connection", %peer, worker_id, request_id,
"No CommandDistributor available to handle worker response");
}
}
Message::WorkerCommandHeader { request_id, worker_id: wid, payload } => {
last_seen = Instant::now();
idle_sleep.as_mut().reset(Instant::now() + idle_grace);
tracing::info!(target: "connection", %peer, worker_id, %request_id, "liveness: header; idle reset");
if let Some(distributor) = &ctx.state.command_distributor {
match volli_commands::wire::decode_payload::<volli_commands::CommandHeader>(&payload) {
Ok(header) => {
let _ = distributor
.handle_command_header(request_id, wid, ctx.manager_id.clone(), header, std::time::SystemTime::now())
.await;
}
Err(err) => {
warn!(target: "connection", %peer, worker_id, %request_id, "Failed to decode worker header: {}", err);
}
}
}
}
Message::WorkerCommandStream { request_id, worker_id: wid, payload } => {
last_seen = Instant::now();
idle_sleep.as_mut().reset(Instant::now() + idle_grace);
tracing::debug!(target: "connection", %peer, worker_id, %request_id, "liveness: stream; idle reset");
if let Some(distributor) = &ctx.state.command_distributor {
match volli_commands::wire::decode_payload::<volli_commands::CommandStream>(&payload) {
Ok(stream) => {
let _ = distributor
.handle_command_stream(request_id, wid, 0, stream, std::time::SystemTime::now())
.await;
}
Err(err) => {
warn!(target: "connection", %peer, worker_id, %request_id, "Failed to decode worker stream: {}", err);
}
}
}
}
Message::WorkerCommandFooter { request_id, worker_id: wid, payload, duration_millis, success } => {
last_seen = Instant::now();
idle_sleep.as_mut().reset(Instant::now() + idle_grace);
tracing::info!(target: "connection", %peer, worker_id, %request_id, success, "liveness: footer; idle reset");
if let Some(distributor) = &ctx.state.command_distributor {
match volli_commands::wire::decode_payload::<volli_commands::CommandFooter>(&payload) {
Ok(footer) => {
let status = if success { volli_commands::CommandStatus::Success } else { volli_commands::CommandStatus::Failed };
let _ = distributor
.handle_command_complete(volli_commands::distribution::CompletionArgs {
request_id,
worker_id: wid,
total_stream_count: 0,
duration: std::time::Duration::from_millis(duration_millis),
final_status: status,
footer_data: footer,
metadata: HashMap::new(),
})
.await;
}
Err(err) => {
warn!(target: "connection", %peer, worker_id, %request_id, "Failed to decode worker footer: {}", err);
}
}
}
}
Message::WorkerCommandError { request_id, error } => {
last_seen = Instant::now();
idle_sleep.as_mut().reset(Instant::now() + idle_grace);
tracing::info!(target: "connection", %peer, worker_id, %request_id, "liveness: error; idle reset");
warn!(target: "connection", %peer, worker_id, request_id,
"Manager received worker error: {}", error);
if let Some(distributor) = &ctx.state.command_distributor {
match distributor.handle_command_error(
request_id.clone(),
error.clone(),
None, Vec::new() ).await {
Ok(_) => info!(target: "connection", %peer, worker_id, request_id,
"Successfully forwarded worker error to CommandDistributor"),
Err(e) => warn!(target: "connection", %peer, worker_id, request_id,
"Failed to forward worker error to CommandDistributor: {}", e),
}
} else {
warn!(target: "connection", %peer, worker_id, request_id,
"No CommandDistributor available to handle worker error");
}
}
Message::Goodbye => {
info!(target: "connection", %peer, worker_id, "received goodbye; closing session");
break;
}
_ => {}
}
}
Ok(None) => {
info!(target: "connection", %peer, worker_id, "transport closed by peer");
break;
}
Err(e) => {
warn!(target: "connection", %peer, worker_id, error = %e, "transport receive error; closing session");
break;
}
}
}
cmd_msg = worker_rx.recv() => {
match cmd_msg {
Some(volli_commands::Message::CommandRequest {
request_id,
command,
args,
timeout_secs,
options,
..
}) => {
let options_json = match serde_json::to_string(&options) {
Ok(json) => json,
Err(e) => {
warn!(target: "connection", %peer, worker_id, request_id, "failed to serialize command options: {}", e);
continue;
}
};
let worker_msg = Message::WorkerCommandRequest {
request_id: request_id.clone(),
command,
args,
timeout_secs,
options: options_json,
};
if let Err(e) = transport.send(&worker_msg).await {
warn!(target: "connection", %peer, worker_id, request_id,
"Failed to send command to worker: {}", e);
if let Some(distributor) = &ctx.state.command_distributor {
let _ = distributor.handle_command_error(
request_id,
format!("Failed to send command to worker: {}", e),
None, Vec::new() ).await;
}
} else {
info!(target: "connection", %peer, worker_id, request_id,
"Successfully forwarded command to worker");
}
}
Some(volli_commands::Message::CommandCancel { request_id }) => {
let worker_msg = Message::WorkerCommandCancel { request_id: request_id.clone() };
if let Err(e) = transport.send(&worker_msg).await {
warn!(target: "connection", %peer, worker_id, request_id,
"Failed to send cancel to worker: {}", e);
} else {
info!(target: "connection", %peer, worker_id, request_id,
"Forwarded cancel to worker");
}
}
Some(_other_message) => {
tracing::debug!(target: "connection", %peer, worker_id, "received unsupported command message type");
}
None => {
tracing::debug!(target: "connection", %peer, worker_id, "command distributor channel closed");
}
}
}
_ = &mut idle_sleep => {
let elapsed = Instant::now().duration_since(last_seen).as_secs();
warn!(target: "connection", %peer, worker_id, elapsed_secs = elapsed, idle_secs = idle_grace.as_secs(), "worker idle timeout; disconnecting");
break;
}
}
}
info!(target: "connection", %peer, worker_id, "worker handler loop exited; performing cleanup");
if let Some(distributor) = &ctx.state.command_distributor {
distributor.unregister_worker(&worker_id).await;
info!(target: "connection", %peer, worker_id, "worker unregistered from command distributor");
}
crate::workers::mark_worker_disconnected(&ctx.state.workers, &worker_id).await;
let worker_count = ctx
.health
.collector
.lock()
.await
.unregister_worker(&worker_id);
info!(target: "health", %peer, worker_count, "worker disconnected, updated count");
let next = worker_count;
let prev = next.saturating_add(1);
warn!(
"Manager {} lost {} worker(s): {} -> {}",
ctx.communication.profile, 1, prev, next
);
let new_ver = ctx
.state
.peer_version
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1;
let _ = crate::save_peer_version(&ctx.communication.profile, new_ver);
let _ = ctx.communication.alive_tx.send(new_ver);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::connection::{
CommunicationContext, HealthContext, NetworkContext, SecurityContext, StateContext,
};
use crate::peers::AliveTable;
use crate::workers::WorkerTable;
use ed25519_dalek::SigningKey;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, AtomicU64};
use tempfile::TempDir;
use tokio::sync::{Mutex, RwLock, broadcast, oneshot};
use tokio::time;
use volli_core::{EnvironmentConfig, override_config_dir, override_env_config};
use volli_transport::MemoryTransport;
fn build_ctx(profile: &str, tenant: &str, cluster: &str, csk: [u8; 32]) -> ManagerContext {
let peers: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let workers: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let peer_version = Arc::new(AtomicU64::new(1));
let (alive_tx, _) = broadcast::channel(8);
let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();
let signing = {
let mut seed = [0u8; 32];
getrandom::fill(&mut seed).expect("rng");
Arc::new(SigningKey::from_bytes(&seed))
};
let csk_ver = Arc::new(AtomicU32::new(1));
let self_meta = ManagerPeerEntry {
manager_id: profile.into(),
manager_name: profile.into(),
tenant: tenant.into(),
cluster: cluster.into(),
host: "127.0.0.1".into(),
tcp_port: 0,
quic_port: 0,
pub_fp: "0".repeat(64),
csk_ver: 1,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
ManagerContext::new_server(
SecurityContext {
signing: Some(signing),
csk: Arc::new(RwLock::new(csk)),
csk_ver,
},
NetworkContext {
worker_nets: Arc::new(Vec::new()),
manager_nets: Arc::new(Vec::new()),
},
StateContext {
peers,
workers,
self_meta,
peer_version,
command_distributor: None,
},
CommunicationContext {
alive_tx,
dial_tx,
profile: profile.into(),
},
HealthContext::default(),
profile.into(),
)
}
#[tokio::test(flavor = "current_thread")]
async fn worker_auth_rejects_tenant_mismatch() {
let tmp = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(tmp.path()));
let ctx = build_ctx("mgr-auth", "tenant-a", "fleet-a", [2u8; 32]);
let token =
volli_core::token::issue_token(&[2u8; 32], "tenant-b", "fleet-a", "w1", 60).unwrap();
let encoded = volli_core::token::encode_token(&token).unwrap();
let (mut client, server) = MemoryTransport::pair();
let server_task = tokio::spawn(async move {
handle_worker_join(&ctx, Box::new(server), "127.0.0.1:0".parse().unwrap()).await
});
client
.send(&Message::Auth {
token: encoded,
worker_id: None,
worker_name: None,
})
.await
.unwrap();
match client.recv().await.unwrap() {
Some(Message::AuthErr) => {}
other => panic!("expected AuthErr, got {other:?}"),
}
let result = server_task.await.unwrap();
assert!(result.is_err(), "expected auth mismatch error");
}
#[tokio::test(flavor = "current_thread")]
async fn worker_idle_timeout_disconnects() {
time::pause();
let _env = override_env_config(EnvironmentConfig {
heartbeat_secs: Some(1),
worker_idle_multiplier: Some(1),
auth_timeout_secs: Some(1),
..Default::default()
});
let tmp = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(tmp.path()));
let csk = [3u8; 32];
let ctx = build_ctx("mgr-idle", "tenant-a", "fleet-a", csk);
let token = volli_core::token::issue_token(&csk, "tenant-a", "fleet-a", "w1", 60).unwrap();
let encoded = volli_core::token::encode_token(&token).unwrap();
let (mut client, server) = MemoryTransport::pair();
let ctx_clone = ctx.clone();
let server_task = tokio::spawn(async move {
handle_worker_join(&ctx_clone, Box::new(server), "127.0.0.1:0".parse().unwrap()).await
});
let (done_tx, done_rx) = oneshot::channel::<()>();
let client_task = tokio::spawn(async move {
client
.send(&Message::Auth {
token: encoded,
worker_id: None,
worker_name: None,
})
.await
.unwrap();
match client.recv().await.unwrap() {
Some(Message::AuthOk) => {}
other => panic!("expected AuthOk, got {other:?}"),
}
match client.recv().await.unwrap() {
Some(Message::Hello {
manager_id,
nonce,
sig,
}) => {
client
.send(&Message::Welcome {
manager_id,
nonce,
sig,
})
.await
.unwrap();
}
other => panic!("expected Hello, got {other:?}"),
}
let _ = client.recv().await;
let _ = done_rx.await;
});
time::advance(std::time::Duration::from_secs(2)).await;
let result = server_task.await.unwrap();
assert!(result.is_ok(), "expected idle disconnect to return Ok");
let worker_count = ctx.health.collector.lock().await.worker_count();
assert_eq!(worker_count, 0);
assert!(
ctx.state
.peer_version
.load(std::sync::atomic::Ordering::SeqCst)
>= 3
);
drop(done_tx);
client_task.abort();
let _ = client_task.await;
}
#[tokio::test(flavor = "current_thread")]
async fn health_override_is_announced_to_workers() {
time::pause();
let _env = override_env_config(EnvironmentConfig {
heartbeat_secs: Some(1),
..Default::default()
});
let tmp = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(tmp.path()));
let csk = [7u8; 32];
let ctx = build_ctx("mgr-health", "tenant-a", "fleet-a", csk);
let token = volli_core::token::issue_token(&csk, "tenant-a", "fleet-a", "w1", 60).unwrap();
let encoded = volli_core::token::encode_token(&token).unwrap();
let (mut client, server) = MemoryTransport::pair();
let ctx_clone = ctx.clone();
let server_task = tokio::spawn(async move {
handle_worker_join(&ctx_clone, Box::new(server), "127.0.0.1:0".parse().unwrap()).await
});
client
.send(&Message::Auth {
token: encoded,
worker_id: None,
worker_name: None,
})
.await
.unwrap();
match client.recv().await.unwrap() {
Some(Message::AuthOk) => {}
other => panic!("expected AuthOk, got {other:?}"),
}
match client.recv().await.unwrap() {
Some(Message::Hello {
manager_id,
nonce,
sig,
}) => {
client
.send(&Message::Welcome {
manager_id,
nonce,
sig,
})
.await
.unwrap();
}
other => panic!("expected Hello, got {other:?}"),
}
if let Some(Message::Announce { .. }) = client.recv().await.unwrap() {}
{
let mut collector = ctx.health.collector.lock().await;
collector.set_override(Some(crate::health::HealthOverride {
health_score: Some(0.25),
avg_cpu: Some(90.0),
avg_memory: Some(80.0),
load_percentage: Some(90.0),
current_workers: Some(5),
max_workers: Some(10),
}));
}
ctx.state
.peer_version
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
client.send(&Message::Ping { version: 0 }).await.unwrap();
let mut got_override = false;
for _ in 0..3 {
if let Some(Message::Announce { meta, .. }) = client.recv().await.unwrap()
&& let Some(h) = meta.health
{
assert!((h.health_score - 0.25).abs() < f32::EPSILON);
assert_eq!(h.current_workers, 5);
got_override = true;
break;
}
}
assert!(got_override, "expected Announce with override health");
{
let mut collector = ctx.health.collector.lock().await;
collector.clear_override();
}
ctx.state
.peer_version
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
client.send(&Message::Ping { version: 0 }).await.unwrap();
let mut got_reset = false;
for _ in 0..3 {
if let Some(Message::Announce { meta, .. }) = client.recv().await.unwrap()
&& let Some(h) = meta.health
{
assert!((h.health_score - 0.25).abs() > f32::EPSILON);
got_reset = true;
break;
}
}
assert!(got_reset, "expected Announce after reset");
drop(client);
let _ = server_task.await;
}
#[tokio::test(flavor = "current_thread")]
async fn worker_auth_accepts_shared_token_with_id() {
let tmp = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(tmp.path()));
let csk = [9u8; 32];
let ctx = build_ctx("mgr-shared", "tenant-a", "fleet-a", csk);
let token = volli_core::token::issue_token(&csk, "tenant-a", "fleet-a", "*", 60).unwrap();
let encoded = volli_core::token::encode_token(&token).unwrap();
let (mut client, server) = MemoryTransport::pair();
let ctx_clone = ctx.clone();
let server_task = tokio::spawn(async move {
handle_worker_join(&ctx_clone, Box::new(server), "127.0.0.1:0".parse().unwrap()).await
});
client
.send(&Message::Auth {
token: encoded,
worker_id: Some("shared-worker-1".to_string()),
worker_name: Some("shared-worker-1".to_string()),
})
.await
.unwrap();
match client.recv().await.unwrap() {
Some(Message::AuthOk) => {}
other => panic!("expected AuthOk, got {other:?}"),
}
match client.recv().await.unwrap() {
Some(Message::Hello {
manager_id,
nonce,
sig,
}) => {
client
.send(&Message::Welcome {
manager_id,
nonce,
sig,
})
.await
.unwrap();
}
other => panic!("expected Hello, got {other:?}"),
}
let result = tokio::time::timeout(std::time::Duration::from_millis(200), async {
loop {
if let Some(entry) = ctx.state.workers.lock().await.get("shared-worker-1") {
assert_eq!(entry.worker_name.as_deref(), Some("shared-worker-1"));
break;
}
tokio::task::yield_now().await;
}
})
.await;
assert!(result.is_ok(), "worker entry not registered in time");
drop(client);
let _ = server_task.await;
}
#[tokio::test(flavor = "current_thread")]
async fn worker_auth_rejects_shared_token_without_id() {
let tmp = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(tmp.path()));
let csk = [9u8; 32];
let ctx = build_ctx("mgr-shared-missing", "tenant-a", "fleet-a", csk);
let token = volli_core::token::issue_token(&csk, "tenant-a", "fleet-a", "*", 60).unwrap();
let encoded = volli_core::token::encode_token(&token).unwrap();
let (mut client, server) = MemoryTransport::pair();
let ctx_clone = ctx.clone();
let server_task = tokio::spawn(async move {
handle_worker_join(&ctx_clone, Box::new(server), "127.0.0.1:0".parse().unwrap()).await
});
client
.send(&Message::Auth {
token: encoded,
worker_id: None,
worker_name: None,
})
.await
.unwrap();
match client.recv().await.unwrap() {
Some(Message::AuthErr) => {}
other => panic!("expected AuthErr, got {other:?}"),
}
let result = server_task.await.unwrap();
assert!(result.is_err(), "expected auth error");
}
}