use super::*;
use crate::connection::handle_manager_as_client;
use crate::keys::load_csk;
use crate::test_harness::{InMemoryHarness, ManagerOptions};
use std::collections::HashMap;
use std::sync::{Arc, atomic::AtomicU32};
use tempfile::TempDir;
use tokio::sync::{Mutex, RwLock, broadcast};
use volli_core::{
EnvironmentConfig, Role, WorkerConfig, override_env_config, override_env_config_patch,
};
use volli_transport::MemoryTransport;
#[tokio::test(flavor = "current_thread")]
async fn join_receives_cluster_key() {
let _env_guard = override_env_config(EnvironmentConfig {
interval_ms: Some(0),
backoff_ms: Some(0),
connect_timeout_ms: Some(1),
auth_timeout_secs: Some(1),
heartbeat_secs: Some(1),
..Default::default()
});
let base = TempDir::new().unwrap();
let _cfg_guard = volli_core::override_config_dir(Some(base.path()));
let _env_guard = override_env_config_patch(EnvironmentConfig {
interval_ms: Some(10),
backoff_ms: Some(10),
..Default::default()
});
let profile = "client";
let dir = crate::keys::secret_dir(Some(profile));
std::fs::create_dir_all(&dir).unwrap();
let csk = [7u8; 32];
let csk_ver = 42;
let tok = volli_core::token::issue_token(&csk, "self", "default", "joiner", 60).unwrap();
let encoded = volli_core::token::encode_token(&tok).unwrap();
let (client, mut server) = MemoryTransport::pair();
let self_meta = ManagerPeerEntry {
manager_id: "client".into(),
manager_name: "client".into(),
tenant: "self".into(),
cluster: "default".into(),
host: "127.0.0.1".into(),
tcp_port: 0,
quic_port: 0,
pub_fp: String::new(),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
let peers = Arc::new(Mutex::new(HashMap::new()));
let workers = Arc::new(Mutex::new(HashMap::new()));
let (alive_tx, _) = broadcast::channel(16);
let cfg = WorkerConfig {
role: Role::Manager,
token: encoded.clone(),
..Default::default()
};
let csk_shared = Arc::new(RwLock::new([0u8; 32]));
let csk_ver_shared = Arc::new(AtomicU32::new(0));
let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();
let ctx = ManagerContext::new_peer(
connection::SecurityContext {
signing: None,
csk: csk_shared.clone(),
csk_ver: csk_ver_shared.clone(),
},
connection::StateContext {
peers: peers.clone(),
workers,
self_meta: self_meta.clone(),
peer_version: Arc::new(AtomicU64::new(1)),
command_distributor: None,
},
connection::CommunicationContext {
alive_tx: alive_tx.clone(),
dial_tx,
profile: profile.to_string(),
},
cfg.clone(),
connection::HealthContext::default(),
);
let client_fut = tokio::spawn(handle_manager_as_client(
ctx,
Box::new(client),
"127.0.0.1".into(),
));
let server_fut = async move {
match server.recv().await.unwrap() {
Some(Message::Auth { token, .. }) => {
assert_eq!(token, encoded);
}
other => panic!("expected auth, got {other:?}"),
}
server.send(&Message::AuthOk).await.unwrap();
server
.send(&Message::ClusterKey { ver: csk_ver, csk })
.await
.unwrap();
match server.recv().await.unwrap() {
Some(Message::Announce { .. }) => {}
other => panic!("expected heartbeat, got {other:?}"),
}
};
let _ = tokio::join!(client_fut, server_fut);
let (saved, ver) = load_csk(profile).unwrap().unwrap();
assert_eq!(saved, csk);
assert_eq!(ver, csk_ver);
}
fn make_meta(id: &str) -> ManagerPeerEntry {
ManagerPeerEntry {
manager_id: id.into(),
manager_name: id.into(),
tenant: "self".into(),
cluster: "default".into(),
host: "127.0.0.1".into(),
tcp_port: 0,
quic_port: 0,
pub_fp: String::new(),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
}
}
#[tokio::test(flavor = "current_thread")]
async fn gossip_propagates_peer_list() {
let _env_guard = override_env_config(EnvironmentConfig {
interval_ms: Some(0),
backoff_ms: Some(0),
connect_timeout_ms: Some(1),
auth_timeout_secs: Some(1),
heartbeat_secs: Some(1),
..Default::default()
});
let base = TempDir::new().unwrap();
let _cfg_guard = volli_core::override_config_dir(Some(base.path()));
let _env_guard = override_env_config_patch(EnvironmentConfig {
interval_ms: Some(10),
backoff_ms: Some(10),
..Default::default()
});
let profile = "node-b";
let dir = crate::keys::secret_dir(Some(profile));
std::fs::create_dir_all(&dir).unwrap();
let csk = [3u8; 32];
let tok = volli_core::token::issue_token(&csk, "self", "default", "joiner", 60).unwrap();
let encoded = volli_core::token::encode_token(&tok).unwrap();
let (client, mut server) = MemoryTransport::pair();
let self_meta = make_meta("b");
let peers = Arc::new(Mutex::new(HashMap::new()));
let workers = Arc::new(Mutex::new(HashMap::new()));
let (alive_tx, _) = broadcast::channel(16);
let cfg = WorkerConfig {
role: Role::Manager,
token: encoded.clone(),
..Default::default()
};
let csk_shared = Arc::new(RwLock::new([0u8; 32]));
let csk_ver_shared = Arc::new(AtomicU32::new(0));
let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();
let ctx = ManagerContext::new_peer(
connection::SecurityContext {
signing: None,
csk: csk_shared.clone(),
csk_ver: csk_ver_shared.clone(),
},
connection::StateContext {
peers: peers.clone(),
workers,
self_meta: self_meta.clone(),
peer_version: Arc::new(AtomicU64::new(1)),
command_distributor: None,
},
connection::CommunicationContext {
alive_tx: alive_tx.clone(),
dial_tx,
profile: profile.to_string(),
},
cfg.clone(),
connection::HealthContext::default(),
);
let client_fut = tokio::spawn(handle_manager_as_client(
ctx,
Box::new(client),
"127.0.0.1".into(),
));
let remote_meta = make_meta("a");
let extra_peer = make_meta("c");
let mut ver_rx = alive_tx.subscribe();
let server_fut = async move {
match server.recv().await.unwrap() {
Some(Message::Auth { token, .. }) => assert_eq!(token, encoded),
other => panic!("expected auth, got {other:?}"),
}
server.send(&Message::AuthOk).await.unwrap();
server
.send(&Message::ClusterKey { ver: 1, csk })
.await
.unwrap();
match server.recv().await.unwrap() {
Some(Message::Announce { .. }) => {}
other => panic!("expected announce, got {other:?}"),
}
server
.send(&Message::Announce {
meta: Box::new(remote_meta),
version: 1,
peers: vec![extra_peer],
workers: Vec::new(),
})
.await
.unwrap();
let _ = ver_rx.recv().await.unwrap();
};
let (client_res, _) = tokio::join!(client_fut, server_fut);
let _ = client_res;
let map = peers.lock().await;
assert!(map.contains_key("self:default:a"));
assert!(map.contains_key("self:default:c"));
}
#[tokio::test(flavor = "current_thread")]
async fn handshake_succeeds_over_memory_transport() {
let harness = InMemoryHarness::new("tenant", "fleet");
let manager = harness.manager("manager-handshake");
let worker = harness.worker("worker-1");
worker.connect(&manager).shutdown().await;
}
#[tokio::test(flavor = "current_thread")]
async fn worker_whitelist_enforced() {
let opts = ManagerOptions {
worker_whitelist: vec!["127.0.0.1/32".parse().unwrap()],
..Default::default()
};
let harness = InMemoryHarness::new("tenant", "fleet");
let manager = harness.manager_with_options("manager-allow", opts);
let worker = harness.worker("worker-allow");
worker.connect(&manager).shutdown().await;
}
#[tokio::test(flavor = "current_thread")]
async fn worker_whitelist_rejects() {
let opts = ManagerOptions {
worker_whitelist: vec!["10.0.0.0/8".parse().unwrap()],
..Default::default()
};
let harness = InMemoryHarness::new("tenant", "fleet");
let manager = harness.manager_with_options("manager-deny", opts);
let token = harness.worker_token("worker-deny");
let (mut client, server) = MemoryTransport::pair();
let addr = std::net::SocketAddr::new(std::net::IpAddr::from([192, 168, 0, 10]), 0);
let server_task = manager.spawn_worker_task_with_addr(server, addr);
let _ = client
.send(&Message::Auth {
token,
worker_id: None,
worker_name: None,
})
.await;
drop(client);
let _ = server_task
.await
.unwrap()
.expect_err("connection should be rejected");
}
mod health_integration {
use crate::health::{HealthCollector, HealthConfig};
use crate::workers::{WorkerTable, list_workers, update_worker};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, broadcast};
use volli_core::{ConfigGuard, EnvironmentConfig, WorkerEntry, override_env_config};
fn fast_env() -> ConfigGuard {
override_env_config(EnvironmentConfig {
interval_ms: Some(0),
backoff_ms: Some(0),
connect_timeout_ms: Some(10),
auth_timeout_secs: Some(1),
heartbeat_secs: Some(1),
..Default::default()
})
}
fn create_test_health_collector(
max_workers: Option<u32>,
metric_collection_interval: Duration,
) -> HealthCollector {
let config = HealthConfig {
max_workers,
metric_collection_interval,
sample_window_size: 3,
cpu_threshold: 80.0,
memory_threshold: 85.0,
};
HealthCollector::with_fixed_metrics(config, Some(10.0), Some(20.0))
}
#[tokio::test(flavor = "current_thread")]
async fn test_health_collector_basic_functionality() {
let _env = fast_env();
let mut collector = create_test_health_collector(Some(100), Duration::from_millis(100));
let initial_metrics = collector.collect_metrics().await;
assert_eq!(initial_metrics.current_workers, 0);
assert!(initial_metrics.health_score >= 0.0 && initial_metrics.health_score <= 1.0);
assert_eq!(initial_metrics.load_percentage, 0.0);
assert_eq!(initial_metrics.max_workers, Some(100));
assert!(collector.can_accept_worker());
collector.increment_worker_count();
let metrics_after_increment = collector.collect_metrics().await;
assert_eq!(metrics_after_increment.current_workers, 1);
assert_eq!(metrics_after_increment.load_percentage, 1.0);
collector.decrement_worker_count();
let metrics_after_decrement = collector.collect_metrics().await;
assert_eq!(metrics_after_decrement.current_workers, 0);
assert_eq!(metrics_after_decrement.load_percentage, 0.0);
}
#[tokio::test(flavor = "current_thread")]
async fn test_health_integration_with_worker_table() {
let _env = fast_env();
let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let (tx, _rx) = broadcast::channel(1);
let version = Arc::new(std::sync::atomic::AtomicU64::new(1));
let mut collector = create_test_health_collector(Some(100), Duration::from_millis(100));
for i in 0..5 {
update_worker(
&table,
&tx,
WorkerEntry {
worker_id: format!("worker_{}", i),
manager_id: "test_manager".to_string(),
worker_name: None,
last_seen: Some(1000 + i),
connected_since: Some(1000 + i),
disconnected_at: None,
},
&version,
)
.await;
collector.increment_worker_count();
}
let workers = list_workers(&table).await;
let health_metrics = collector.collect_metrics().await;
assert_eq!(workers.len(), 5);
assert_eq!(health_metrics.current_workers, 5);
for _ in 0..2 {
collector.decrement_worker_count();
}
let updated_metrics = collector.collect_metrics().await;
assert_eq!(updated_metrics.current_workers, 3);
}
}