use super::{HealthContext, ManagerContext, SecurityContext, StateContext};
use crate::{
ManagerPeerEntry,
connection::{CommunicationContext, manager::process_peer_update},
};
use std::collections::HashMap;
use std::sync::{
Arc,
atomic::{AtomicU32, AtomicU64},
};
use tempfile::TempDir;
use tokio::{
runtime::Builder,
sync::{Mutex, RwLock, broadcast},
};
use volli_core::{EnvironmentConfig, Role, WorkerConfig, override_config_dir, override_env_config};
#[test]
fn self_not_added_from_peer_list() {
let base = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(base.path()));
let _env = override_env_config(EnvironmentConfig {
interval_ms: Some(0),
backoff_ms: Some(0),
..Default::default()
});
let rt = Builder::new_current_thread().enable_all().build().unwrap();
rt.block_on(async {
let self_meta = ManagerPeerEntry {
manager_id: "self".into(),
manager_name: "self".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.1".into(),
tcp_port: 4242,
quic_port: 4242,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
let peers = Arc::new(Mutex::new(HashMap::new()));
let (alive_tx, _) = broadcast::channel(8);
let workers = Arc::new(Mutex::new(HashMap::new()));
let csk = Arc::new(RwLock::new([0u8; 32]));
let csk_ver = Arc::new(AtomicU32::new(0));
let peer_version = Arc::new(AtomicU64::new(0));
let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();
let ctx = ManagerContext::new_peer(
SecurityContext {
signing: None,
csk,
csk_ver,
},
StateContext {
peers: peers.clone(),
workers,
self_meta: self_meta.clone(),
peer_version,
command_distributor: None,
},
CommunicationContext {
alive_tx,
dial_tx,
profile: "profile".into(),
},
WorkerConfig {
host: "127.0.0.1".into(),
quic_port: 4242,
tcp_port: 0,
protocol: None,
token: "t".into(),
fingerprint: String::new(),
cert: Vec::new(),
role: Role::Manager,
},
HealthContext::default(),
);
let remote_meta = ManagerPeerEntry {
manager_id: "remote".into(),
manager_name: "remote".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.2".into(),
tcp_port: 0,
quic_port: 0,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
assert!(
process_peer_update(&ctx, remote_meta.clone()).await,
"remote peer should be persisted"
);
assert!(
!process_peer_update(&ctx, self_meta.clone()).await,
"self entry should be ignored"
);
});
}