use crate::{
keys::{add_peer, load_peers, save_peer_version},
remove_peer,
util::{PeerLog, now_secs, test_dur},
};
use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use tokio::{
sync::{Mutex, broadcast},
time,
};
use tracing::{debug, info};
use volli_core::{ConnectionState, ManagerPeerEntry, env_config};
use std::sync::{Mutex as StdMutex, OnceLock};
type ProfilePeerMap = std::collections::HashMap<String, ManagerPeerEntry>;
type PeerCache = std::collections::HashMap<String, ProfilePeerMap>;
static PEER_CACHE: OnceLock<std::sync::Arc<StdMutex<PeerCache>>> = OnceLock::new();
fn peer_cache() -> &'static std::sync::Arc<StdMutex<PeerCache>> {
PEER_CACHE.get_or_init(|| std::sync::Arc::new(StdMutex::new(std::collections::HashMap::new())))
}
pub fn cached_peers_for_profile(profile: &str) -> Option<Vec<ManagerPeerEntry>> {
let cache_arc = peer_cache();
let mut cache = cache_arc.lock().unwrap();
let prof = cache.entry(profile.to_string()).or_insert_with(|| {
let mut m = std::collections::HashMap::new();
if let Ok(peers) = load_peers(profile) {
for p in peers {
let k = format!("{}:{}:{}", p.tenant, p.cluster, p.manager_id);
m.insert(k, p);
}
}
m
});
Some(prof.values().cloned().collect())
}
pub fn remove_cached_peer(profile: &str, manager_id: &str, tenant: &str, cluster: &str) {
let cache_arc = peer_cache();
let mut cache = cache_arc.lock().unwrap();
if let Some(prof) = cache.get_mut(profile) {
let key = format!("{}:{}:{}", tenant, cluster, manager_id);
prof.remove(&key);
}
}
#[derive(Debug, Clone, Default)]
pub struct PeerState {
pub last_connect_attempt: Option<u64>,
pub last_connect_success: Option<u64>,
pub last_connect_failure: Option<u64>,
pub last_disconnect_log: Option<u64>,
pub consecutive_failures: u32,
pub dead: bool,
pub conn_state: ConnectionState,
}
pub type AliveTable = Arc<Mutex<HashMap<String, PeerState>>>;
pub type AliveTx = broadcast::Sender<u64>;
pub async fn update_alive(
table: &AliveTable,
tx: &AliveTx,
profile: &str,
meta: ManagerPeerEntry,
version: &Arc<AtomicU64>,
) -> bool {
let key = format!("{}:{}:{}", meta.tenant, meta.cluster, meta.manager_id);
let mut existing_name = None;
let mut material_change = false;
{
let cache_arc = peer_cache();
let mut cache = cache_arc.lock().unwrap();
let prof = cache.entry(profile.to_string()).or_insert_with(|| {
let mut m = std::collections::HashMap::new();
if let Ok(peers) = load_peers(profile) {
for p in peers {
let k = format!("{}:{}:{}", p.tenant, p.cluster, p.manager_id);
m.insert(k, p);
}
}
m
});
if let Some(prev) = prof.get(&key) {
existing_name = Some(prev.manager_name.clone());
match (&prev.health, &meta.health) {
(Some(old), Some(new)) => {
let hs_delta = (old.health_score - new.health_score).abs();
let cpu_delta = match (old.avg_cpu, new.avg_cpu) {
(Some(a), Some(b)) => (a - b).abs(),
_ => 0.0,
};
let mem_delta = match (old.avg_memory, new.avg_memory) {
(Some(a), Some(b)) => (a - b).abs(),
_ => 0.0,
};
let load_delta = (old.load_percentage - new.load_percentage).abs();
let workers_changed = old.current_workers != new.current_workers;
if hs_delta >= 0.05
|| cpu_delta >= 10.0
|| mem_delta >= 10.0
|| load_delta >= 10.0
|| workers_changed
{
material_change = true;
}
}
(None, Some(_)) | (Some(_), None) => {
material_change = true;
}
(None, None) => {}
}
}
}
let is_new = {
let map = table.lock().await;
!map.contains_key(&key)
};
if is_new || material_change {
let mut peer_meta = meta.clone();
if peer_meta.manager_name.is_empty() {
peer_meta.manager_name = existing_name.unwrap_or_else(|| meta.host.clone());
}
if let Err(e) = add_peer(profile, peer_meta) {
tracing::warn!("Failed to save peer info for {}: {}", key, e);
}
let cache_arc = peer_cache();
let mut cache = cache_arc.lock().unwrap();
if let Some(prof) = cache.get_mut(profile) {
prof.insert(key.clone(), meta.clone());
}
}
let mut map = table.lock().await;
map.entry(key).or_default();
if is_new || material_change {
let new_ver = version.fetch_add(1, Ordering::SeqCst) + 1;
let _ = save_peer_version(profile, new_ver);
let _ = tx.send(new_ver);
}
is_new
}
pub async fn set_peer_state(table: &AliveTable, meta: &ManagerPeerEntry, state: ConnectionState) {
let key = format!("{}:{}:{}", meta.tenant, meta.cluster, meta.manager_id);
let mut map = table.lock().await;
let entry = map.entry(key).or_default();
entry.conn_state = state;
}
pub async fn prune_stale_peers(
table: &AliveTable,
profile: &str,
tx: &AliveTx,
version: &Arc<AtomicU64>,
older_than_secs: u64,
) -> usize {
let now = now_secs();
let metas = cached_peers_for_profile(profile)
.unwrap_or_else(|| load_peers(profile).unwrap_or_default());
let meta_map: HashMap<_, _> = metas
.into_iter()
.map(|m| (format!("{}:{}:{}", m.tenant, m.cluster, m.manager_id), m))
.collect();
let mut removed_ids = Vec::new();
{
let mut map = table.lock().await;
map.retain(|id, state| {
let inactive = state.conn_state == ConnectionState::Inactive;
let last = state
.last_connect_failure
.or(state.last_connect_attempt)
.or(state.last_connect_success);
let stale = match last {
Some(ts) => now.saturating_sub(ts) >= older_than_secs,
None => true,
};
let keep = !(inactive && stale);
if !keep {
removed_ids.push(id.clone());
}
keep
});
}
for id in &removed_ids {
if let Some(meta) = meta_map.get(id) {
let _ = remove_peer(profile, &meta.manager_id, &meta.tenant, &meta.cluster);
remove_cached_peer(profile, &meta.manager_id, &meta.tenant, &meta.cluster);
}
}
if !removed_ids.is_empty() {
let new_ver = version.fetch_add(1, Ordering::SeqCst) + 1;
let _ = save_peer_version(profile, new_ver);
let _ = tx.send(new_ver);
}
removed_ids.len()
}
const DEFAULT_PEER_CLEANUP_SECS: u64 = 30;
fn peer_cleanup_secs() -> u64 {
env_config()
.peer_cleanup_secs
.unwrap_or(DEFAULT_PEER_CLEANUP_SECS)
}
pub async fn sweep_dead(table: AliveTable, profile: String, tx: AliveTx, version: Arc<AtomicU64>) {
loop {
time::sleep(test_dur(peer_cleanup_secs())).await;
let now = now_secs();
let metas = cached_peers_for_profile(&profile)
.unwrap_or_else(|| load_peers(&profile).unwrap_or_default());
let meta_map: HashMap<_, _> = metas
.into_iter()
.map(|m| (format!("{}:{}:{}", m.tenant, m.cluster, m.manager_id), m))
.collect();
let mut map = table.lock().await;
let mut removed = Vec::new();
map.retain(|id, v| {
let inactive = matches!(v.conn_state, ConnectionState::Inactive);
let expired = v
.last_connect_failure
.is_some_and(|t| now.saturating_sub(t) > peer_cleanup_secs());
let alive = !(inactive && expired);
if !alive {
removed.push(id.clone());
if let Some(meta) = meta_map.get(id) {
info!(peer=%PeerLog(meta), "peer removed");
} else {
info!(peer=%id, "peer removed");
}
}
alive
});
drop(map);
for id in &removed {
if let Some(meta) = meta_map.get(id) {
let _ = remove_peer(&profile, &meta.manager_id, &meta.tenant, &meta.cluster);
}
}
if !removed.is_empty() {
debug!(removed = removed.len(), "swept dead peers");
let new_ver = version.fetch_add(1, Ordering::SeqCst) + 1;
let _ = save_peer_version(&profile, new_ver);
let _ = tx.send(new_ver);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use tempfile::TempDir;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{Mutex, broadcast};
use volli_core::{
EnvironmentConfig, HealthMetrics, override_config_dir, override_env_config_patch,
};
#[tokio::test]
async fn update_alive_retains_existing_name() {
let base = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(base.path()));
let _env = override_env_config_patch(EnvironmentConfig::default());
let profile = "p";
std::fs::create_dir_all(crate::secret_dir(Some(profile))).unwrap();
let existing = ManagerPeerEntry {
manager_id: "id".into(),
manager_name: "known".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "h".into(),
tcp_port: 1,
quic_port: 1,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
crate::add_peer(profile, existing.clone()).unwrap();
let table: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let (tx, _) = broadcast::channel(8);
let ver = Arc::new(AtomicU64::new(1));
let mut meta = existing.clone();
meta.manager_name = String::new();
update_alive(&table, &tx, profile, meta, &ver).await;
let stored = crate::load_peers(profile).unwrap();
assert_eq!(stored[0].manager_name, "known");
}
#[tokio::test]
async fn update_alive_detects_material_health_change() {
let base = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(base.path()));
let _env = override_env_config_patch(EnvironmentConfig::default());
let profile = "health-change";
std::fs::create_dir_all(crate::secret_dir(Some(profile))).unwrap();
let table: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let (tx, _) = broadcast::channel(8);
let mut rx = tx.subscribe();
let ver = Arc::new(AtomicU64::new(1));
let base_health = HealthMetrics {
health_score: 0.9,
load_percentage: 10.0,
max_workers: Some(10),
current_workers: 1,
avg_cpu: Some(10.0),
avg_memory: Some(20.0),
last_health_update: 1,
};
let meta = ManagerPeerEntry {
manager_id: "peer".into(),
manager_name: "peer".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.1".into(),
tcp_port: 1,
quic_port: 1,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: Some(base_health.clone()),
};
assert!(update_alive(&table, &tx, profile, meta.clone(), &ver).await);
let _ = rx.try_recv();
let after_insert = ver.load(Ordering::SeqCst);
let mut meta_same = meta.clone();
if let Some(ref mut h) = meta_same.health {
h.last_health_update = 2;
}
assert!(!update_alive(&table, &tx, profile, meta_same, &ver).await);
assert_eq!(ver.load(Ordering::SeqCst), after_insert);
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
let mut meta_changed = meta.clone();
if let Some(ref mut h) = meta_changed.health {
h.load_percentage = 25.0;
}
assert!(!update_alive(&table, &tx, profile, meta_changed, &ver).await);
assert_eq!(ver.load(Ordering::SeqCst), after_insert + 1);
assert!(rx.try_recv().is_ok());
}
#[tokio::test]
async fn prune_stale_peers_removes_inactive_without_timestamps() {
let base = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(base.path()));
let _env = override_env_config_patch(EnvironmentConfig::default());
let profile = "prune-stale";
std::fs::create_dir_all(crate::secret_dir(Some(profile))).unwrap();
let peer = ManagerPeerEntry {
manager_id: "peer".into(),
manager_name: "peer".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.1".into(),
tcp_port: 1,
quic_port: 1,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
crate::add_peer(profile, peer.clone()).unwrap();
let table: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let (tx, _) = broadcast::channel(8);
let ver = Arc::new(AtomicU64::new(1));
{
let mut map = table.lock().await;
let key = format!("{}:{}:{}", peer.tenant, peer.cluster, peer.manager_id);
map.insert(
key,
PeerState {
conn_state: ConnectionState::Inactive,
..Default::default()
},
);
}
let removed = prune_stale_peers(&table, profile, &tx, &ver, 0).await;
assert_eq!(removed, 1);
let stored = crate::load_peers(profile).unwrap();
assert!(stored.is_empty());
}
#[tokio::test]
async fn prune_stale_peers_keeps_recent_inactive() {
let base = TempDir::new().unwrap();
let _cfg = override_config_dir(Some(base.path()));
let _env = override_env_config_patch(EnvironmentConfig::default());
let profile = "prune-stale-keep";
std::fs::create_dir_all(crate::secret_dir(Some(profile))).unwrap();
let peer = ManagerPeerEntry {
manager_id: "peer".into(),
manager_name: "peer".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "127.0.0.1".into(),
tcp_port: 1,
quic_port: 1,
pub_fp: "0".repeat(64),
csk_ver: 0,
tls_cert: String::new(),
tls_fp: String::new(),
health: None,
};
crate::add_peer(profile, peer.clone()).unwrap();
let table: AliveTable = Arc::new(Mutex::new(HashMap::new()));
let (tx, _) = broadcast::channel(8);
let ver = Arc::new(AtomicU64::new(1));
{
let mut map = table.lock().await;
let key = format!("{}:{}:{}", peer.tenant, peer.cluster, peer.manager_id);
map.insert(
key,
PeerState {
conn_state: ConnectionState::Inactive,
last_connect_success: Some(now_secs()),
..Default::default()
},
);
}
let removed = prune_stale_peers(&table, profile, &tx, &ver, 3600).await;
assert_eq!(removed, 0);
let stored = crate::load_peers(profile).unwrap();
assert_eq!(stored.len(), 1);
}
}