use super::{
CommunicationContext, HealthContext, ManagerContext, SecurityContext, StateContext,
handle_manager_as_client,
};
use crate::{Message, peers::AliveTable, workers::WorkerTable};
use serial_test::serial;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use tempfile::TempDir;
use tokio::sync::{Mutex, RwLock, broadcast};
use volli_core::{
ConfigGuard, EnvironmentConfig, ManagerPeerEntry, Role, WorkerConfig, override_config_dir,
override_env_config, override_env_config_patch,
};
use volli_transport::{MemoryTransport, MessageTransportExt};
fn make_test_peer(id: &str, name: &str) -> ManagerPeerEntry {
ManagerPeerEntry {
manager_id: id.into(),
manager_name: name.into(),
tenant: "test".into(),
cluster: "default".into(),
host: "127.0.0.1".into(),
tcp_port: 4242,
quic_port: 4243,
pub_fp: "0".repeat(64),
csk_ver: 1,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
}
}
fn make_test_peer_for_gossip(id: &str, name: &str) -> ManagerPeerEntry {
ManagerPeerEntry {
manager_id: id.into(),
manager_name: name.into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.1".into(),
tcp_port: 0,
quic_port: 0,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
}
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_precedence_rules_validation() {
let node_a = make_test_peer("aaa", "node-a");
let node_b = make_test_peer("bbb", "node-b");
let node_z = make_test_peer("zzz", "node-z");
assert!(crate::mesh::should_dial_peer(&node_z, &node_a));
assert!(crate::mesh::should_dial_peer(&node_z, &node_b));
assert!(crate::mesh::should_dial_peer(&node_b, &node_a));
assert!(!crate::mesh::should_dial_peer(&node_a, &node_b));
assert!(!crate::mesh::should_dial_peer(&node_a, &node_z));
assert!(!crate::mesh::should_dial_peer(&node_b, &node_z));
assert!(!crate::mesh::should_dial_peer(&node_a, &node_a));
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_cross_tenant_isolation() {
let node_a = make_test_peer("node-a", "a");
let mut node_b = make_test_peer("node-b", "b");
assert!(crate::mesh::should_dial_peer(&node_b, &node_a));
node_b.tenant = "other-tenant".into();
assert!(!crate::mesh::should_dial_peer(&node_b, &node_a));
assert!(!crate::mesh::should_dial_peer(&node_a, &node_b));
node_b.tenant = node_a.tenant.clone();
node_b.cluster = "other-cluster".into();
assert!(!crate::mesh::should_dial_peer(&node_b, &node_a));
assert!(!crate::mesh::should_dial_peer(&node_a, &node_b));
}
fn fast_env() -> ConfigGuard {
override_env_config(EnvironmentConfig {
interval_ms: Some(0),
backoff_ms: Some(0),
connect_timeout_ms: Some(10),
auth_timeout_secs: Some(1),
heartbeat_secs: Some(1),
..Default::default()
})
}
#[tokio::test(flavor = "current_thread")]
async fn self_not_added_from_peer_list() {
let _env = fast_env();
let base = TempDir::new().unwrap();
let _cfg_dir = override_config_dir(Some(base.path()));
let _env = override_env_config_patch(EnvironmentConfig {
interval_ms: Some(0),
backoff_ms: Some(0),
..Default::default()
});
let profile = "p2";
std::fs::create_dir_all(crate::secret_dir(Some(profile))).unwrap();
let self_meta = ManagerPeerEntry {
manager_id: "self".into(),
manager_name: "self".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.1".into(),
tcp_port: 4242,
quic_port: 4242,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
let cfg = Box::leak(Box::new(WorkerConfig {
host: "127.0.0.1".into(),
quic_port: 4242,
tcp_port: 0,
protocol: None,
token: "t".into(),
fingerprint: String::new(),
cert: Vec::new(),
role: Role::Manager,
}));
let peers: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let (alive_tx, _) = broadcast::channel(8);
let mut alive_rx = alive_tx.subscribe();
let workers: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let csk = Arc::new(RwLock::new([0u8; 32]));
let csk_ver = Arc::new(AtomicU32::new(0));
let peer_version = Arc::new(AtomicU64::new(0));
let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();
let ctx = ManagerContext::new_peer(
SecurityContext {
signing: None,
csk,
csk_ver,
},
StateContext {
peers: peers.clone(),
workers,
self_meta: self_meta.clone(),
peer_version: peer_version.clone(),
command_distributor: None,
},
CommunicationContext {
alive_tx: alive_tx.clone(),
dial_tx,
profile: profile.into(),
},
cfg.clone(),
HealthContext::default(),
);
let (client, mut server) = MemoryTransport::pair();
let client_task = tokio::spawn(async move {
let _ = handle_manager_as_client(ctx, Box::new(client), "127.0.0.1:4242".into()).await;
});
match server.recv().await.unwrap() {
Some(Message::Auth { token, .. }) => assert_eq!(token, "t"),
other => panic!("expected auth, got {other:?}"),
}
server.send(&Message::AuthOk).await.unwrap();
server
.send(&Message::ClusterKey {
ver: 1,
csk: [0u8; 32],
})
.await
.unwrap();
let _ = server.recv().await.unwrap();
let remote_meta = ManagerPeerEntry {
manager_id: "remote".into(),
manager_name: "remote".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.2".into(),
tcp_port: 0,
quic_port: 0,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
let self_entry = ManagerPeerEntry {
manager_id: self_meta.manager_id.clone(),
manager_name: String::new(),
tenant: self_meta.tenant.clone(),
cluster: self_meta.cluster.clone(),
host: self_meta.host.clone(),
tcp_port: self_meta.tcp_port,
quic_port: self_meta.quic_port,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
server
.send(&Message::Announce {
meta: Box::new(remote_meta),
version: peer_version.load(Ordering::SeqCst) + 1,
peers: vec![self_entry],
workers: vec![],
})
.await
.unwrap();
let _ = alive_rx.recv().await.expect("expected peer update");
drop(server);
client_task.abort();
let _ = client_task.await;
let cached = crate::peers::cached_peers_for_profile(profile).unwrap();
assert_eq!(cached.len(), 1);
assert_eq!(cached[0].manager_id, "remote");
assert_eq!(cached[0].manager_name, "remote");
}
#[tokio::test(flavor = "current_thread")]
async fn worker_announce_includes_full_peer_list() {
let _env = fast_env();
let base = TempDir::new().unwrap();
let _cfg_dir = override_config_dir(Some(base.path()));
let _env = override_env_config_patch(EnvironmentConfig {
interval_ms: Some(10),
backoff_ms: Some(10),
..Default::default()
});
let profile = "mgr_full";
std::fs::create_dir_all(crate::secret_dir(Some(profile))).unwrap();
let extra = ManagerPeerEntry {
manager_id: "peer-x".into(),
manager_name: "peer-x".into(),
tenant: "self".into(),
cluster: "default".into(),
host: "127.0.0.9".into(),
tcp_port: 4242,
quic_port: 4242,
pub_fp: String::new(),
csk_ver: 1,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
crate::add_peer(profile, extra.clone()).unwrap();
let peers_to_send = crate::load_peers_for_gossip(profile).unwrap_or_default();
assert!(peers_to_send.iter().any(|p| p.manager_id == "peer-x"));
}
#[tokio::test(flavor = "current_thread")]
async fn gossip_coalesces_to_latest_version() {
let _env = fast_env();
let base = TempDir::new().unwrap();
let _cfg_dir = override_config_dir(Some(base.path()));
let _env = override_env_config_patch(EnvironmentConfig {
interval_ms: Some(5),
backoff_ms: Some(5),
..Default::default()
});
let profile = "gossip_prof";
std::fs::create_dir_all(crate::secret_dir(Some(profile))).unwrap();
let self_meta = ManagerPeerEntry {
manager_id: "self".into(),
manager_name: "self".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.1".into(),
tcp_port: 0,
quic_port: 0,
pub_fp: String::new(),
csk_ver: 1,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
let peers: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let workers: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let peer_version = Arc::new(AtomicU64::new(1));
let (alive_tx, _) = broadcast::channel(8);
let mut alive_rx = alive_tx.subscribe();
let csk = Arc::new(RwLock::new([7u8; 32]));
let csk_ver = Arc::new(AtomicU32::new(1));
let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();
let ctx = ManagerContext::new_peer(
SecurityContext {
signing: None,
csk,
csk_ver,
},
StateContext {
peers: peers.clone(),
workers,
self_meta: self_meta.clone(),
peer_version: peer_version.clone(),
command_distributor: None,
},
CommunicationContext {
alive_tx: alive_tx.clone(),
dial_tx: dial_tx.clone(),
profile: profile.to_string(),
},
WorkerConfig {
host: "127.0.0.1".into(),
quic_port: 0,
tcp_port: 0,
protocol: None,
token: "t".into(),
fingerprint: String::new(),
cert: Vec::new(),
role: Role::Manager,
},
HealthContext::default(),
);
let (client, mut server) = MemoryTransport::pair();
let client_task = tokio::spawn(async move {
let _ = handle_manager_as_client(ctx, Box::new(client), "mem".into()).await;
});
match server.recv().await.unwrap() {
Some(Message::Auth { .. }) => {}
other => panic!("expected auth, got {other:?}"),
}
server.send(&Message::AuthOk).await.unwrap();
server
.send(&Message::ClusterKey {
ver: 1,
csk: [7u8; 32],
})
.await
.unwrap();
let _ = server.recv().await.unwrap();
let p3 = ManagerPeerEntry {
manager_id: "p3".into(),
manager_name: "p3".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "h3".into(),
tcp_port: 1,
quic_port: 1,
pub_fp: String::new(),
csk_ver: 1,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
server
.send(&Message::Announce {
meta: Box::new(self_meta.clone()),
version: 2,
peers: vec![make_test_peer_for_gossip("p2", "p2")],
workers: vec![],
})
.await
.unwrap();
server
.send(&Message::Announce {
meta: Box::new(self_meta.clone()),
version: 3,
peers: vec![p3.clone()],
workers: vec![],
})
.await
.unwrap();
loop {
match alive_rx.recv().await {
Ok(_) => {
let map = peers.lock().await;
if map.contains_key("t:c:p3") {
break;
}
}
Err(err) => panic!("peer update channel closed: {err}"),
}
}
drop(server);
client_task.abort();
let _ = client_task.await;
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_token_validation() {
use std::time::{SystemTime, UNIX_EPOCH};
use volli_core::token::{decode_token, encode_token, issue_token, verify_token};
let csk = [42u8; 32];
let valid_token = issue_token(&csk, "test", "default", "worker-1", 300).unwrap();
let encoded = encode_token(&valid_token).unwrap();
let decoded = decode_token(&encoded).unwrap();
assert!(verify_token(&decoded, &csk).is_ok());
assert_eq!(decoded.payload.worker_id, "worker-1");
let past_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- 120;
let mut expired_token = issue_token(&csk, "test", "default", "worker-1", 60).unwrap();
expired_token.payload.iat = past_time;
let encoded_expired = encode_token(&expired_token).unwrap();
let decoded_expired = decode_token(&encoded_expired).unwrap();
assert!(
decoded_expired.payload.iat
< SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- 60
);
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn test_dial_token_generation() {
use volli_core::token::{decode_token, verify_token};
let csk = [42u8; 32];
let self_meta = make_test_peer("self", "self-manager");
let encoded = crate::mesh::dial_token(&csk, &self_meta).unwrap();
let token = decode_token(&encoded).unwrap();
assert!(verify_token(&token, &csk).is_ok());
assert_eq!(token.payload.worker_id, self_meta.manager_id);
assert_eq!(token.payload.tenant, self_meta.tenant);
assert_eq!(token.payload.cluster, self_meta.cluster);
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn gossip_hint_is_consumed_once_then_backoff() {
let base = TempDir::new().unwrap();
let _cfg_dir = override_config_dir(Some(base.path()));
let _env = override_env_config(EnvironmentConfig {
connect_timeout_ms: Some(1),
backoff_ms: Some(0),
..Default::default()
});
let self_meta = ManagerPeerEntry {
manager_id: "self-123".into(),
manager_name: "self".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.1".into(),
tcp_port: 65535,
quic_port: 65534,
pub_fp: "0".repeat(64),
csk_ver: 1,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
let peers: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let workers: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let peer_version = Arc::new(AtomicU64::new(1));
let (alive_tx, _) = broadcast::channel(8);
let csk = Arc::new(RwLock::new([1u8; 32]));
let csk_ver = Arc::new(AtomicU32::new(1));
let cfg = crate::ServerConfigOpts {
profile: "test-prof".into(),
..Default::default()
};
std::fs::create_dir_all(crate::secret_dir(Some(&cfg.profile))).unwrap();
let dial_tx = super::mesh::spawn_mesh_runner(
&cfg,
self_meta.clone(),
peers.clone(),
peer_version.clone(),
alive_tx.clone(),
csk.clone(),
csk_ver.clone(),
workers.clone(),
super::HealthContext::default(),
);
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
crate::util::set_test_sleep_notifier(tx);
let failing_peer = ManagerPeerEntry {
manager_id: "peer-xyz".into(),
manager_name: "peer".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "192.0.2.1".into(),
tcp_port: 9,
quic_port: 9,
pub_fp: "0".repeat(64),
csk_ver: 1,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
dial_tx
.send(super::mesh::PeerDialRequest {
peer: failing_peer,
gossip_hint: true,
})
.unwrap();
rx.recv().await.expect("dialer did not backoff after hint");
}
#[tokio::test(flavor = "current_thread")]
#[serial]
async fn broadcast_lag_coalesces_to_latest_version() {
let base = TempDir::new().unwrap();
let _cfg_dir = override_config_dir(Some(base.path()));
let _env = override_env_config(EnvironmentConfig {
interval_ms: Some(0),
backoff_ms: Some(0),
..Default::default()
});
let profile = "lag_prof";
std::fs::create_dir_all(crate::secret_dir(Some(profile))).unwrap();
let csk = [9u8; 32];
let token = volli_core::token::issue_token(&csk, "t", "c", "self", 60).unwrap();
let encoded = volli_core::token::encode_token(&token).unwrap();
let self_meta = ManagerPeerEntry {
manager_id: "self".into(),
manager_name: "self".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.1".into(),
tcp_port: 0,
quic_port: 0,
pub_fp: "0".repeat(64),
csk_ver: 1,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
let peers: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let workers: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let peer_version = Arc::new(AtomicU64::new(1));
let (alive_tx, _) = broadcast::channel(1);
let csk_shared = Arc::new(RwLock::new(csk));
let csk_ver = Arc::new(AtomicU32::new(1));
let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();
let ctx = ManagerContext::new_peer(
SecurityContext {
signing: None,
csk: csk_shared.clone(),
csk_ver: csk_ver.clone(),
},
StateContext {
peers: peers.clone(),
workers,
self_meta: self_meta.clone(),
peer_version: peer_version.clone(),
command_distributor: None,
},
CommunicationContext {
alive_tx: alive_tx.clone(),
dial_tx,
profile: profile.to_string(),
},
WorkerConfig {
role: Role::Manager,
token: encoded.clone(),
..Default::default()
},
HealthContext::default(),
);
let (client, mut server) = MemoryTransport::pair();
let client_task = tokio::spawn(async move {
let _ = handle_manager_as_client(ctx, Box::new(client), "mem".into()).await;
});
match server.recv().await.unwrap() {
Some(Message::Auth { .. }) => {}
other => panic!("expected auth, got {other:?}"),
}
server.send(&Message::AuthOk).await.unwrap();
server
.send(&Message::ClusterKey { ver: 1, csk })
.await
.unwrap();
match server.recv().await.unwrap() {
Some(Message::Announce { version, .. }) => {
assert_eq!(version, 1);
}
other => panic!("expected initial announce, got {other:?}"),
}
peer_version.store(9, Ordering::SeqCst);
for ver in 2..10 {
alive_tx.send(ver).unwrap();
}
let announce = tokio::time::timeout(std::time::Duration::from_millis(200), async {
loop {
match server.recv().await.unwrap() {
Some(Message::Announce { version, .. }) => {
if version == 9 {
break version;
}
}
other => panic!("expected announce, got {other:?}"),
}
}
})
.await
.expect("coalesced announce not received");
assert_eq!(announce, 9);
drop(server);
client_task.abort();
let _ = client_task.await;
}